From f936c8461ae8fbbb31f455bba9be15e177ff7c47 Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Thu, 2 Oct 2025 08:52:04 +0000 Subject: [PATCH 1/2] Merge pull request #87303 from ClickHouse/row_policy_prewhere_with_optimize_2 Fix condition not being moved to PREWHERE in case there is a row policy (version 2) --- src/Formats/FormatFactory.cpp | 5 +- src/Formats/FormatFilterInfo.cpp | 36 ++-- src/Formats/FormatFilterInfo.h | 10 +- src/Interpreters/ExpressionAnalyzer.cpp | 12 +- src/Interpreters/ExpressionAnalyzer.h | 6 +- src/Interpreters/InterpreterSelectQuery.cpp | 126 +++++--------- src/Interpreters/InterpreterSelectQuery.h | 2 +- .../getHeaderForProcessingStage.cpp | 2 +- src/Planner/PlannerJoinTree.cpp | 63 ++----- .../Formats/Impl/Parquet/ReadManager.cpp | 2 +- .../Formats/Impl/Parquet/Reader.cpp | 43 +++-- .../optimizeLazyMaterialization.cpp | 8 +- .../Optimizations/optimizePrewhere.cpp | 9 +- .../optimizePrimaryKeyConditionAndLimit.cpp | 7 +- .../Optimizations/projectionsCommon.cpp | 17 +- .../QueryPlan/ReadFromMergeTree.cpp | 164 ++++++++++-------- .../QueryPlan/ReadFromObjectStorageStep.cpp | 12 +- .../QueryPlan/SourceStepWithFilter.cpp | 121 +++++++------ .../QueryPlan/SourceStepWithFilter.h | 8 +- src/Storages/IStorage.cpp | 5 - .../MergeTree/MergeTreeBlockReadUtils.cpp | 5 +- .../MergeTree/MergeTreeBlockReadUtils.h | 1 + .../MergeTree/MergeTreePrefetchedReadPool.cpp | 2 + .../MergeTree/MergeTreePrefetchedReadPool.h | 1 + src/Storages/MergeTree/MergeTreeReadPool.cpp | 2 + src/Storages/MergeTree/MergeTreeReadPool.h | 1 + .../MergeTree/MergeTreeReadPoolBase.cpp | 3 + .../MergeTree/MergeTreeReadPoolBase.h | 2 + .../MergeTree/MergeTreeReadPoolInOrder.cpp | 2 + .../MergeTree/MergeTreeReadPoolInOrder.h | 1 + .../MergeTreeReadPoolParallelReplicas.cpp | 2 + .../MergeTreeReadPoolParallelReplicas.h | 1 + ...rgeTreeReadPoolParallelReplicasInOrder.cpp | 2 + ...MergeTreeReadPoolParallelReplicasInOrder.h | 1 + .../MergeTree/MergeTreeSelectProcessor.cpp | 73 ++++---- .../MergeTree/MergeTreeSelectProcessor.h | 6 +- .../DataLakes/Iceberg/Compaction.cpp | 2 +- .../DataLakes/Iceberg/Mutations.cpp | 2 +- .../Iceberg/PositionDeleteTransform.cpp | 2 +- .../ObjectStorage/StorageObjectStorage.cpp | 6 +- .../StorageObjectStorageSource.cpp | 2 +- src/Storages/SelectQueryInfo.cpp | 64 +++---- src/Storages/SelectQueryInfo.h | 16 +- src/Storages/StorageBuffer.cpp | 62 ++++--- src/Storages/StorageDummy.cpp | 2 +- src/Storages/StorageFile.cpp | 10 +- src/Storages/StorageURL.cpp | 18 +- src/Storages/prepareReadingFromFormat.cpp | 8 +- src/Storages/prepareReadingFromFormat.h | 6 +- ...n_merge_tree_prewhere_row_policy.reference | 4 +- ...591_optimize_prewhere_row_policy.reference | 144 +++++++++++++++ .../03591_optimize_prewhere_row_policy.sql | 34 ++++ 52 files changed, 671 insertions(+), 474 deletions(-) create mode 100644 tests/queries/0_stateless/03591_optimize_prewhere_row_policy.reference create mode 100644 tests/queries/0_stateless/03591_optimize_prewhere_row_policy.sql diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 4b955510a608..7fd8c20a4a75 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -399,8 +399,9 @@ InputFormatPtr FormatFactory::getInput( const FormatSettings format_settings = _format_settings ? *_format_settings : getFormatSettings(context); const Settings & settings = context->getSettingsRef(); - if (format_filter_info && format_filter_info->prewhere_info && (!creators.random_access_input_creator || !creators.prewhere_support_checker || !creators.prewhere_support_checker(format_settings))) - throw Exception(ErrorCodes::LOGICAL_ERROR, "PREWHERE passed to format that doesn't support it"); + if (format_filter_info && (format_filter_info->prewhere_info || format_filter_info->row_level_filter) && (!creators.random_access_input_creator || !creators.prewhere_support_checker || !creators.prewhere_support_checker(format_settings))) + throw Exception(ErrorCodes::LOGICAL_ERROR, "{} passed to format that doesn't support it", + format_filter_info->prewhere_info ? "PREWHERE" : "ROW LEVEL FILTER"); if (!parser_shared_resources) parser_shared_resources = std::make_shared( diff --git a/src/Formats/FormatFilterInfo.cpp b/src/Formats/FormatFilterInfo.cpp index 8d25deefb397..677f93a5a17f 100644 --- a/src/Formats/FormatFilterInfo.cpp +++ b/src/Formats/FormatFilterInfo.cpp @@ -43,19 +43,21 @@ std::pair, std::unordered_map return {clickhouse_to_parquet_names, parquet_names_to_clickhouse}; } - FormatFilterInfo::FormatFilterInfo(std::shared_ptr filter_actions_dag_, const ContextPtr & context_, ColumnMapperPtr column_mapper_) - : filter_actions_dag(filter_actions_dag_) - , context(context_) - , column_mapper(column_mapper_) - { - } +FormatFilterInfo::FormatFilterInfo( + std::shared_ptr filter_actions_dag_, + const ContextPtr & context_, + ColumnMapperPtr column_mapper_, + FilterDAGInfoPtr row_level_filter_, + PrewhereInfoPtr prewhere_info_) + : filter_actions_dag(filter_actions_dag_) + , context(context_) + , row_level_filter(std::move(row_level_filter_)) + , prewhere_info(std::move(prewhere_info_)) + , column_mapper(column_mapper_) +{ +} - FormatFilterInfo::FormatFilterInfo() - : filter_actions_dag(nullptr) - , context(static_cast(nullptr)) - , column_mapper(nullptr) - { - } +FormatFilterInfo::FormatFilterInfo() = default; bool FormatFilterInfo::hasFilter() const @@ -73,7 +75,7 @@ void FormatFilterInfo::initKeyCondition(const Block & keys) if (!ctx) throw Exception(ErrorCodes::LOGICAL_ERROR, "Context has expired"); - if (prewhere_info) + if (prewhere_info || row_level_filter) { auto add_columns = [&](const ActionsDAG & dag) { @@ -83,9 +85,11 @@ void FormatFilterInfo::initKeyCondition(const Block & keys) additional_columns.insert({col.type->createColumn(), col.type, col.name}); } }; - if (prewhere_info->row_level_filter.has_value()) - add_columns(prewhere_info->row_level_filter.value()); - add_columns(prewhere_info->prewhere_actions); + + if (row_level_filter) + add_columns(row_level_filter->actions); + if (prewhere_info) + add_columns(prewhere_info->prewhere_actions); } ColumnsWithTypeAndName columns = keys.getColumnsWithTypeAndName(); diff --git a/src/Formats/FormatFilterInfo.h b/src/Formats/FormatFilterInfo.h index 3d37a95c93b2..f2b3c9688887 100644 --- a/src/Formats/FormatFilterInfo.h +++ b/src/Formats/FormatFilterInfo.h @@ -12,6 +12,8 @@ struct Settings; class KeyCondition; struct PrewhereInfo; using PrewhereInfoPtr = std::shared_ptr; +struct FilterDAGInfo; +using FilterDAGInfoPtr = std::shared_ptr; /// Some formats needs to custom mapping between columns in file and clickhouse columns. class ColumnMapper @@ -41,12 +43,18 @@ using FormatFilterInfoPtr = std::shared_ptr; /// because most implementations don't use most of this struct. struct FormatFilterInfo { - FormatFilterInfo(std::shared_ptr filter_actions_dag_, const ContextPtr & context_, ColumnMapperPtr column_mapper_); + FormatFilterInfo( + std::shared_ptr filter_actions_dag_, + const ContextPtr & context_, + ColumnMapperPtr column_mapper_, + FilterDAGInfoPtr row_level_filter_, + PrewhereInfoPtr prewhere_info_); FormatFilterInfo(); std::shared_ptr filter_actions_dag; ContextWeakPtr context; // required only if `filter_actions_dag` is set + FilterDAGInfoPtr row_level_filter; PrewhereInfoPtr prewhere_info; // assigned only if the format supports prewhere /// Optionally created from filter_actions_dag, if the format needs it. diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index a74a0484d0eb..9fd9911b9cc2 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -1936,7 +1936,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult( bool first_stage_, bool second_stage_, bool only_types, - const FilterDAGInfoPtr & filter_info_, + const FilterDAGInfoPtr & row_policy_info_, const FilterDAGInfoPtr & additional_filter, const Block & source_header) : first_stage(first_stage_) @@ -2034,10 +2034,10 @@ ExpressionAnalysisResult::ExpressionAnalysisResult( columns_for_additional_filter.begin(), columns_for_additional_filter.end()); } - if (storage && filter_info_) + if (storage && row_policy_info_) { - filter_info = filter_info_; - filter_info->do_remove_column = true; + row_policy_info = row_policy_info_; + row_policy_info->do_remove_column = true; } if (prewhere_dag_and_flags = query_analyzer.appendPrewhere(chain, !first_stage); prewhere_dag_and_flags) @@ -2376,9 +2376,9 @@ std::string ExpressionAnalysisResult::dump() const ss << "prewhere_info " << prewhere_info->dump() << "\n"; } - if (filter_info) + if (row_policy_info) { - ss << "filter_info " << filter_info->dump() << "\n"; + ss << "filter_info " << row_policy_info->dump() << "\n"; } if (before_aggregation) diff --git a/src/Interpreters/ExpressionAnalyzer.h b/src/Interpreters/ExpressionAnalyzer.h index 9665bb1c32e1..e8eea1fbf635 100644 --- a/src/Interpreters/ExpressionAnalyzer.h +++ b/src/Interpreters/ExpressionAnalyzer.h @@ -273,7 +273,7 @@ struct ExpressionAnalysisResult NameSet columns_to_remove_after_prewhere; PrewhereInfoPtr prewhere_info; - FilterDAGInfoPtr filter_info; + FilterDAGInfoPtr row_policy_info; ConstantFilterDescription prewhere_constant_filter_description; ConstantFilterDescription where_constant_filter_description; /// Actions by every element of ORDER BY @@ -288,12 +288,12 @@ struct ExpressionAnalysisResult bool first_stage, bool second_stage, bool only_types, - const FilterDAGInfoPtr & filter_info, + const FilterDAGInfoPtr & row_policy_info, const FilterDAGInfoPtr & additional_filter, /// for setting additional_filters const Block & source_header); /// Filter for row-level security. - bool hasFilter() const { return filter_info.get(); } + bool hasRowPolicyFilter() const { return row_policy_info.get(); } bool hasJoin() const { return join.get(); } bool hasPrewhere() const { return prewhere_info.get(); } diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 684922d596ce..f93e70002de9 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -894,7 +894,7 @@ InterpreterSelectQuery::InterpreterSelectQuery( /// Fix source_header for filter actions. if (row_policy_filter && !row_policy_filter->empty()) { - filter_info = generateFilterActions( + row_policy_info = generateFilterActions( table_id, row_policy_filter->expression, context, storage, storage_snapshot, metadata_snapshot, required_columns, prepared_sets); @@ -1063,8 +1063,6 @@ bool InterpreterSelectQuery::adjustParallelReplicasAfterAnalysis() max_rows = max_rows ? std::min(max_rows, settings[Setting::max_rows_to_read].value) : settings[Setting::max_rows_to_read]; query_info_copy.trivial_limit = max_rows; - /// Apply filters to prewhere and add them to the query_info so we can filter out parts efficiently during row estimation - applyFiltersToPrewhereInAnalysis(analysis_copy); if (analysis_copy.prewhere_info) { query_info_copy.prewhere_info = analysis_copy.prewhere_info; @@ -1080,13 +1078,13 @@ bool InterpreterSelectQuery::adjustParallelReplicasAfterAnalysis() = query_info_copy.prewhere_info->prewhere_actions.findInOutputs(query_info_copy.prewhere_info->prewhere_column_name); added_filter_nodes.nodes.push_back(&node); } + } - if (query_info_copy.prewhere_info->row_level_filter) - { - const auto & node - = query_info_copy.prewhere_info->row_level_filter->findInOutputs(query_info_copy.prewhere_info->row_level_column_name); - added_filter_nodes.nodes.push_back(&node); - } + if (query_info_copy.row_level_filter) + { + const auto & node + = query_info_copy.row_level_filter->actions.findInOutputs(query_info_copy.row_level_filter->column_name); + added_filter_nodes.nodes.push_back(&node); } if (auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes)) @@ -1189,7 +1187,7 @@ Block InterpreterSelectQuery::getSampleBlockImpl() && options.to_stage > QueryProcessingStage::WithMergeableState; analysis_result = ExpressionAnalysisResult( - *query_analyzer, metadata_snapshot, first_stage, second_stage, options.only_analyze, filter_info, additional_filter_info, *source_header); + *query_analyzer, metadata_snapshot, first_stage, second_stage, options.only_analyze, row_policy_info, additional_filter_info, *source_header); if (options.to_stage == QueryProcessingStage::Enum::FetchColumns) { @@ -1632,13 +1630,13 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional

(source_header); query_plan.addStep(std::move(read_nothing)); - if (expressions.filter_info) + if (expressions.row_policy_info) { auto row_level_security_step = std::make_unique( query_plan.getCurrentHeader(), - expressions.filter_info->actions.clone(), - expressions.filter_info->column_name, - expressions.filter_info->do_remove_column); + expressions.row_policy_info->actions.clone(), + expressions.row_policy_info->column_name, + expressions.row_policy_info->do_remove_column); row_level_security_step->setStepDescription("Row-level security filter"); query_plan.addStep(std::move(row_level_security_step)); @@ -1646,18 +1644,6 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional

row_level_filter) - { - auto row_level_filter_step = std::make_unique( - query_plan.getCurrentHeader(), - expressions.prewhere_info->row_level_filter->clone(), - expressions.prewhere_info->row_level_column_name, - true); - - row_level_filter_step->setStepDescription("Row-level security filter (PREWHERE)"); - query_plan.addStep(std::move(row_level_filter_step)); - } - auto prewhere_step = std::make_unique( query_plan.getCurrentHeader(), expressions.prewhere_info->prewhere_actions.clone(), @@ -1759,13 +1745,13 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional

supportsPrewhere())) { auto row_level_security_step = std::make_unique( query_plan.getCurrentHeader(), - expressions.filter_info->actions.clone(), - expressions.filter_info->column_name, - expressions.filter_info->do_remove_column); + expressions.row_policy_info->actions.clone(), + expressions.row_policy_info->column_name, + expressions.row_policy_info->do_remove_column); row_level_security_step->setStepDescription("Row-level security filter"); query_plan.addStep(std::move(row_level_security_step)); @@ -2206,21 +2192,21 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(QueryPlan & query_plan, c { Pipe pipe(std::make_shared(std::make_shared(source_header))); - if (query_info.prewhere_info) + if (query_info.row_level_filter) { - auto & prewhere_info = *query_info.prewhere_info; - - if (prewhere_info.row_level_filter) + auto row_level_actions = std::make_shared(query_info.row_level_filter->actions.clone()); + pipe.addSimpleTransform([&](const SharedHeader & header) { - auto row_level_actions = std::make_shared(prewhere_info.row_level_filter->clone()); - pipe.addSimpleTransform([&](const SharedHeader & header) - { - return std::make_shared(header, - row_level_actions, - prewhere_info.row_level_column_name, true); - }); - } + return std::make_shared(header, + row_level_actions, + query_info.row_level_filter->column_name, + query_info.row_level_filter->do_remove_column); + }); + } + if (query_info.prewhere_info) + { + auto & prewhere_info = *query_info.prewhere_info; auto filter_actions = std::make_shared(prewhere_info.prewhere_actions.clone()); pipe.addSimpleTransform([&](const SharedHeader & header) { @@ -2259,38 +2245,9 @@ bool InterpreterSelectQuery::shouldMoveToPrewhere() const return settings[Setting::optimize_move_to_prewhere] && (!query.final() || settings[Setting::optimize_move_to_prewhere_if_final]); } -/// Note that this is const and accepts the analysis ref to be able to use it to do analysis for parallel replicas -/// without affecting the final analysis multiple times -void InterpreterSelectQuery::applyFiltersToPrewhereInAnalysis(ExpressionAnalysisResult & analysis) const -{ - if (!analysis.filter_info) - return; - - if (!analysis.prewhere_info) - { - const bool does_storage_support_prewhere = !input_pipe && storage && storage->supportsPrewhere(); - if (does_storage_support_prewhere && shouldMoveToPrewhere()) - { - /// Execute row level filter in prewhere as a part of "move to prewhere" optimization. - analysis.prewhere_info = std::make_shared(std::move(analysis.filter_info->actions), analysis.filter_info->column_name); - analysis.prewhere_info->remove_prewhere_column = std::move(analysis.filter_info->do_remove_column); - analysis.prewhere_info->need_filter = true; - analysis.filter_info = nullptr; - } - } - else - { - /// Add row level security actions to prewhere. - analysis.prewhere_info->row_level_filter = std::move(analysis.filter_info->actions); - analysis.prewhere_info->row_level_column_name = std::move(analysis.filter_info->column_name); - analysis.filter_info = nullptr; - } -} - - void InterpreterSelectQuery::addPrewhereAliasActions() { - applyFiltersToPrewhereInAnalysis(analysis_result); + auto & row_level_filter = analysis_result.row_policy_info; auto & prewhere_info = analysis_result.prewhere_info; auto & columns_to_remove_after_prewhere = analysis_result.columns_to_remove_after_prewhere; @@ -2317,12 +2274,12 @@ void InterpreterSelectQuery::addPrewhereAliasActions() /// Get some columns directly from PREWHERE expression actions auto prewhere_required_columns = prewhere_info->prewhere_actions.getRequiredColumns().getNames(); columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end()); + } - if (prewhere_info->row_level_filter) - { - auto row_level_required_columns = prewhere_info->row_level_filter->getRequiredColumns().getNames(); - columns.insert(row_level_required_columns.begin(), row_level_required_columns.end()); - } + if (row_level_filter) + { + auto row_level_required_columns = row_level_filter->actions.getRequiredColumns().getNames(); + columns.insert(row_level_required_columns.begin(), row_level_required_columns.end()); } return columns; @@ -2480,13 +2437,15 @@ std::optional InterpreterSelectQuery::getTrivialCount(UInt64 allow_exper // It's possible to optimize count() given only partition predicates ActionsDAG::NodeRawConstPtrs filter_nodes; + if (analysis_result.hasRowPolicyFilter()) + { + auto & row_level_filter = analysis_result.row_policy_info; + filter_nodes.push_back(&row_level_filter->actions.findInOutputs(row_level_filter->column_name)); + } if (analysis_result.hasPrewhere()) { auto & prewhere_info = analysis_result.prewhere_info; filter_nodes.push_back(&prewhere_info->prewhere_actions.findInOutputs(prewhere_info->prewhere_column_name)); - - if (prewhere_info->row_level_filter) - filter_nodes.push_back(&prewhere_info->row_level_filter->findInOutputs(prewhere_info->row_level_column_name)); } if (analysis_result.hasWhere()) { @@ -2677,10 +2636,11 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc if (max_streams == 0) max_streams = 1; - auto & prewhere_info = analysis_result.prewhere_info; + if (analysis_result.row_policy_info && (!input_pipe && storage && storage->supportsPrewhere())) + query_info.row_level_filter = analysis_result.row_policy_info; - if (prewhere_info) - query_info.prewhere_info = prewhere_info; + if (analysis_result.prewhere_info) + query_info.prewhere_info = analysis_result.prewhere_info; bool optimize_read_in_order = analysis_result.optimize_read_in_order; bool optimize_aggregation_in_order = analysis_result.optimize_read_in_order && !query_analyzer->useGroupingSetKey(); diff --git a/src/Interpreters/InterpreterSelectQuery.h b/src/Interpreters/InterpreterSelectQuery.h index af85ac4b9f30..18084cc43ee0 100644 --- a/src/Interpreters/InterpreterSelectQuery.h +++ b/src/Interpreters/InterpreterSelectQuery.h @@ -220,7 +220,7 @@ class InterpreterSelectQuery : public IInterpreterUnionOrSelectQuery ExpressionAnalysisResult analysis_result; /// For row-level security. RowPolicyFilterPtr row_policy_filter; - FilterDAGInfoPtr filter_info; + FilterDAGInfoPtr row_policy_info; /// For additional_filter setting. FilterDAGInfoPtr additional_filter_info; diff --git a/src/Interpreters/getHeaderForProcessingStage.cpp b/src/Interpreters/getHeaderForProcessingStage.cpp index c686e3f042dd..bb1eec0ba044 100644 --- a/src/Interpreters/getHeaderForProcessingStage.cpp +++ b/src/Interpreters/getHeaderForProcessingStage.cpp @@ -104,7 +104,7 @@ SharedHeader getHeaderForProcessingStage( case QueryProcessingStage::FetchColumns: { Block header = storage_snapshot->getSampleBlockForColumns(column_names); - header = SourceStepWithFilter::applyPrewhereActions(header, query_info.prewhere_info); + header = SourceStepWithFilter::applyPrewhereActions(header, query_info.row_level_filter, query_info.prewhere_info); return std::make_shared(std::move(header)); } case QueryProcessingStage::WithMergeableState: diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index dff3ebfb14c2..97051d776018 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -576,6 +576,7 @@ std::optional buildCustomKeyFilterIfNeeded(const StoragePtr & sto std::optional buildAdditionalFiltersIfNeeded(const StoragePtr & storage, const String & table_expression_alias, SelectQueryInfo & table_expression_query_info, + const PrewhereInfoPtr & prewhere_info, PlannerContextPtr & planner_context) { const auto & query_context = planner_context->getQueryContext(); @@ -615,7 +616,13 @@ std::optional buildAdditionalFiltersIfNeeded(const StoragePtr & s return {}; table_expression_query_info.additional_filter_ast = additional_filter_ast; - return buildFilterInfo(additional_filter_ast, table_expression_query_info.table_expression, planner_context); + auto filter_info = buildFilterInfo(additional_filter_ast, table_expression_query_info.table_expression, planner_context); + if (prewhere_info) + { + for (const auto * input : filter_info.actions.getInputs()) + prewhere_info->prewhere_actions.tryRestoreColumn(input->result_name); + } + return filter_info; } UInt64 mainQueryNodeBlockSizeByLimit(const SelectQueryInfo & select_query_info) @@ -869,6 +876,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres { if (!select_query_options.only_analyze) { + auto & row_level_filter = table_expression_query_info.row_level_filter; auto & prewhere_info = table_expression_query_info.prewhere_info; const auto & prewhere_actions = table_expression_data.getPrewhereFilterActions(); const auto & columns_names = table_expression_data.getColumnNames(); @@ -897,52 +905,17 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres updatePrewhereOutputsIfNeeded(table_expression_query_info, table_expression_data.getColumnNames(), storage_snapshot); - const auto add_filter = [&](FilterDAGInfo & filter_info, std::string description) - { - bool is_final = table_expression_query_info.table_expression_modifiers - && table_expression_query_info.table_expression_modifiers->hasFinal(); - bool optimize_move_to_prewhere - = settings[Setting::optimize_move_to_prewhere] && (!is_final || settings[Setting::optimize_move_to_prewhere_if_final]); - - auto supported_prewhere_columns = storage->supportedPrewhereColumns(); - bool has_table_virtual_column = - filter_info.column_name == "_table" && storage->isVirtualColumn(filter_info.column_name, storage_snapshot->metadata); - if (!select_query_options.build_logical_plan && storage->canMoveConditionsToPrewhere() && optimize_move_to_prewhere - && (!supported_prewhere_columns || supported_prewhere_columns->contains(filter_info.column_name)) - && !has_table_virtual_column) - { - if (!prewhere_info) - { - prewhere_info = std::make_shared(); - prewhere_info->prewhere_actions = std::move(filter_info.actions); - prewhere_info->prewhere_column_name = filter_info.column_name; - prewhere_info->remove_prewhere_column = filter_info.do_remove_column; - prewhere_info->need_filter = true; - } - else if (!prewhere_info->row_level_filter) - { - prewhere_info->row_level_filter = std::move(filter_info.actions); - prewhere_info->row_level_column_name = filter_info.column_name; - prewhere_info->need_filter = true; - } - else - { - where_filters.emplace_back(std::move(filter_info), std::move(description)); - } - - } - else - { - where_filters.emplace_back(std::move(filter_info), std::move(description)); - } - }; - auto row_policy_filter_info = buildRowPolicyFilterIfNeeded(storage, table_expression_query_info, planner_context, used_row_policies); if (row_policy_filter_info) { table_expression_data.setRowLevelFilterActions(row_policy_filter_info->actions.clone()); - add_filter(*row_policy_filter_info, "Row-level security filter"); + + /// TODO: Never put row-level security filter in WHERE clause for storages that do not support PREWHERE to avoid merging of filters. + if (storage->supportsPrewhere()) + row_level_filter = std::make_shared(std::move(*row_policy_filter_info)); + else + where_filters.emplace_back(std::move(*row_policy_filter_info), "Row-level security filter"); } if (query_context->canUseParallelReplicasCustomKey()) @@ -950,7 +923,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres if (settings[Setting::parallel_replicas_count] > 1) { if (auto parallel_replicas_custom_key_filter_info= buildCustomKeyFilterIfNeeded(storage, table_expression_query_info, planner_context)) - add_filter(*parallel_replicas_custom_key_filter_info, "Parallel replicas custom key filter"); + where_filters.emplace_back(std::move(*parallel_replicas_custom_key_filter_info), "Parallel replicas custom key filter"); } else if (auto * distributed = typeid_cast(storage.get()); distributed && query_context->canUseParallelReplicasCustomKeyForCluster(*distributed->getCluster())) @@ -965,10 +938,10 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres } const auto & table_expression_alias = table_expression->getOriginalAlias(); - if (auto additional_filters_info = buildAdditionalFiltersIfNeeded(storage, table_expression_alias, table_expression_query_info, planner_context)) + if (auto additional_filters_info = buildAdditionalFiltersIfNeeded(storage, table_expression_alias, table_expression_query_info, prewhere_info, planner_context)) { appendSetsFromActionsDAG(additional_filters_info->actions, useful_sets); - add_filter(*additional_filters_info, "additional filter"); + where_filters.emplace_back(std::move(*additional_filters_info), "additional filter"); } if (!select_query_options.build_logical_plan) diff --git a/src/Processors/Formats/Impl/Parquet/ReadManager.cpp b/src/Processors/Formats/Impl/Parquet/ReadManager.cpp index 0c83f9a03c0e..0896c1273d22 100644 --- a/src/Processors/Formats/Impl/Parquet/ReadManager.cpp +++ b/src/Processors/Formats/Impl/Parquet/ReadManager.cpp @@ -58,7 +58,7 @@ void ReadManager::init(FormatParserSharedResourcesPtr parser_shared_resources_) /// eat all memory, and MainData would have to execute in one thread to minimize memory usage. double sum = 0; stages[size_t(ReadStage::MainData)].memory_target_fraction *= 10; - if (reader.format_filter_info->prewhere_info) + if (reader.format_filter_info->prewhere_info || reader.format_filter_info->row_level_filter) stages[size_t(ReadStage::PrewhereData)].memory_target_fraction *= 5; else { diff --git a/src/Processors/Formats/Impl/Parquet/Reader.cpp b/src/Processors/Formats/Impl/Parquet/Reader.cpp index 924c337f0aa5..2b24648a6d18 100644 --- a/src/Processors/Formats/Impl/Parquet/Reader.cpp +++ b/src/Processors/Formats/Impl/Parquet/Reader.cpp @@ -268,10 +268,13 @@ void Reader::prefilterAndInitRowGroups() for (const auto & col : format_filter_info->additional_columns) extended_sample_block.insert(col); extended_sample_block_data_types = extended_sample_block.getDataTypes(); - PrewhereInfoPtr prewhere_info = format_filter_info->prewhere_info; + const auto & row_level_filter = format_filter_info->row_level_filter; + const auto & prewhere_info = format_filter_info->prewhere_info; /// Process schema. SchemaConverter schemer(file_metadata, options, &extended_sample_block); + if (row_level_filter && !row_level_filter->do_remove_column) + schemer.external_columns.push_back(row_level_filter->column_name); if (prewhere_info && !prewhere_info->remove_prewhere_column) schemer.external_columns.push_back(prewhere_info->prewhere_column_name); schemer.prepareForReading(); @@ -451,7 +454,7 @@ void Reader::prepareBloomFilterCondition() void Reader::initializePrefetches() { - bool use_offset_index = options.format.parquet.use_offset_index || format_filter_info->prewhere_info + bool use_offset_index = options.format.parquet.use_offset_index || format_filter_info->prewhere_info || format_filter_info->row_level_filter || std::any_of(primitive_columns.begin(), primitive_columns.end(), [](const auto & c) { return c.column_index_condition; }); bool need_to_find_bloom_filter_lengths_the_hard_way = false; @@ -597,8 +600,9 @@ void Reader::initializePrefetches() void Reader::preparePrewhere() { - PrewhereInfoPtr prewhere_info = format_filter_info->prewhere_info; - if (!prewhere_info) + const auto & row_level_filter = format_filter_info->row_level_filter; + const auto & prewhere_info = format_filter_info->prewhere_info; + if (!prewhere_info && !row_level_filter) return; /// TODO [parquet]: We currently run prewhere after reading all prewhere columns of the row @@ -609,25 +613,30 @@ void Reader::preparePrewhere() /// Convert ActionsDAG to ExpressionActions. ExpressionActionsSettings actions_settings; - if (prewhere_info->row_level_filter.has_value()) + if (row_level_filter) { - ExpressionActions actions(prewhere_info->row_level_filter->clone(), actions_settings); + ExpressionActions actions(row_level_filter->actions.clone(), actions_settings); prewhere_steps.push_back(PrewhereStep - { - .actions = std::move(actions), - .result_column_name = prewhere_info->row_level_column_name - }); - } - ExpressionActions actions(prewhere_info->prewhere_actions.clone(), actions_settings); - prewhere_steps.push_back(PrewhereStep { .actions = std::move(actions), - .result_column_name = prewhere_info->prewhere_column_name, - .need_filter = prewhere_info->need_filter, + .result_column_name = row_level_filter->column_name, }); - if (!prewhere_info->remove_prewhere_column) - prewhere_steps.back().idx_in_output_block = sample_block->getPositionByName(prewhere_info->prewhere_column_name); + if (!row_level_filter->do_remove_column) + prewhere_steps.back().idx_in_output_block = sample_block->getPositionByName(row_level_filter->column_name); + } + if (prewhere_info) + { + ExpressionActions actions(prewhere_info->prewhere_actions.clone(), actions_settings); + prewhere_steps.push_back(PrewhereStep + { + .actions = std::move(actions), + .result_column_name = prewhere_info->prewhere_column_name, + .need_filter = prewhere_info->need_filter, + }); + if (!prewhere_info->remove_prewhere_column) + prewhere_steps.back().idx_in_output_block = sample_block->getPositionByName(prewhere_info->prewhere_column_name); + } /// Look up expression inputs in extended_sample_block. for (PrewhereStep & step : prewhere_steps) { diff --git a/src/Processors/QueryPlan/Optimizations/optimizeLazyMaterialization.cpp b/src/Processors/QueryPlan/Optimizations/optimizeLazyMaterialization.cpp index 950ac49c1055..a2f5275fd0f1 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeLazyMaterialization.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeLazyMaterialization.cpp @@ -131,13 +131,11 @@ static void collectLazilyReadColumnNames( for (const auto & column_name : lazily_read_column_name_set) alias_index.emplace(column_name, column_name); - if (const auto & prewhere_info = read_from_merge_tree->getPrewhereInfo()) - { - if (prewhere_info->row_level_filter) - removeUsedColumnNames(*prewhere_info->row_level_filter, lazily_read_column_name_set, alias_index, prewhere_info->row_level_column_name); + if (const auto & row_level_filter = read_from_merge_tree->getRowLevelFilter()) + removeUsedColumnNames(row_level_filter->actions, lazily_read_column_name_set, alias_index, row_level_filter->column_name); + if (const auto & prewhere_info = read_from_merge_tree->getPrewhereInfo()) removeUsedColumnNames(prewhere_info->prewhere_actions, lazily_read_column_name_set, alias_index, prewhere_info->prewhere_column_name); - } for (auto step_it = steps.rbegin(); step_it != steps.rend(); ++step_it) { diff --git a/src/Processors/QueryPlan/Optimizations/optimizePrewhere.cpp b/src/Processors/QueryPlan/Optimizations/optimizePrewhere.cpp index 70eb278f2332..38b5becae90a 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizePrewhere.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizePrewhere.cpp @@ -140,8 +140,7 @@ void optimizePrewhere(Stack & stack, QueryPlan::Nodes &) if (!storage.canMoveConditionsToPrewhere()) return; - const auto & storage_prewhere_info = source_step_with_filter->getPrewhereInfo(); - if (storage_prewhere_info) + if (source_step_with_filter->getPrewhereInfo()) return; /// TODO: We can also check for UnionStep, such as StorageBuffer and local distributed plans. @@ -196,11 +195,7 @@ void optimizePrewhere(Stack & stack, QueryPlan::Nodes &) if (optimize_result.prewhere_nodes.empty()) return; - PrewhereInfoPtr prewhere_info; - if (storage_prewhere_info) - prewhere_info = storage_prewhere_info->clone(); - else - prewhere_info = std::make_shared(); + PrewhereInfoPtr prewhere_info = std::make_shared(); auto remaining_expr = splitAndFillPrewhereInfo( prewhere_info, diff --git a/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp b/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp index ce36c7bddb43..f8c8c83171f5 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp @@ -16,12 +16,11 @@ void optimizePrimaryKeyConditionAndLimit(const Stack & stack) return; const auto & storage_prewhere_info = source_step_with_filter->getPrewhereInfo(); + const auto & storage_row_level_filter = source_step_with_filter->getRowLevelFilter(); + if (storage_row_level_filter) + source_step_with_filter->addFilter(storage_row_level_filter->actions.clone(), storage_row_level_filter->column_name); if (storage_prewhere_info) - { source_step_with_filter->addFilter(storage_prewhere_info->prewhere_actions.clone(), storage_prewhere_info->prewhere_column_name); - if (storage_prewhere_info->row_level_filter) - source_step_with_filter->addFilter(storage_prewhere_info->row_level_filter->clone(), storage_prewhere_info->row_level_column_name); - } for (auto iter = stack.rbegin() + 1; iter != stack.rend(); ++iter) { diff --git a/src/Processors/QueryPlan/Optimizations/projectionsCommon.cpp b/src/Processors/QueryPlan/Optimizations/projectionsCommon.cpp index 666c5609c5b0..31f4aa6ce9bb 100644 --- a/src/Processors/QueryPlan/Optimizations/projectionsCommon.cpp +++ b/src/Processors/QueryPlan/Optimizations/projectionsCommon.cpp @@ -151,17 +151,16 @@ bool QueryDAG::buildImpl(QueryPlan::Node & node, ActionsDAG::NodeRawConstPtrs & IQueryPlanStep * step = node.step.get(); if (auto * reading = typeid_cast(step)) { + if (const auto & row_level_filter = reading->getRowLevelFilter()) + { + appendExpression(row_level_filter->actions); + if (const auto * filter_expression = findInOutputs(*dag, row_level_filter->column_name, row_level_filter->do_remove_column)) + filter_nodes.push_back(filter_expression); + else + return false; + } if (const auto & prewhere_info = reading->getPrewhereInfo()) { - if (prewhere_info->row_level_filter) - { - appendExpression(*prewhere_info->row_level_filter); - if (const auto * filter_expression = findInOutputs(*dag, prewhere_info->row_level_column_name, false)) - filter_nodes.push_back(filter_expression); - else - return false; - } - appendExpression(prewhere_info->prewhere_actions); if (const auto * filter_expression = findInOutputs(*dag, prewhere_info->prewhere_column_name, prewhere_info->remove_prewhere_column)) diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 7f824d005008..987418e42d58 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -106,13 +106,14 @@ bool restoreDAGInputs(ActionsDAG & dag, const NameSet & inputs) return added; } -bool restorePrewhereInputs(PrewhereInfo & info, const NameSet & inputs) +bool restorePrewhereInputs(FilterDAGInfo * row_level_filter, PrewhereInfo * info, const NameSet & inputs) { bool added = false; - if (info.row_level_filter) - added = added || restoreDAGInputs(*info.row_level_filter, inputs); + if (row_level_filter) + added = added || restoreDAGInputs(row_level_filter->actions, inputs); - added = added || restoreDAGInputs(info.prewhere_actions, inputs); + if (info) + added = added || restoreDAGInputs(info->prewhere_actions, inputs); return added; } @@ -212,7 +213,8 @@ static SortDescription getSortDescriptionForOutputHeader( const std::vector & reverse_flags, const int sort_direction, InputOrderInfoPtr input_order_info, - PrewhereInfoPtr prewhere_info, + const FilterDAGInfoPtr & row_level_filter, + const PrewhereInfoPtr & prewhere_info, bool enable_vertical_final) { /// Updating sort description can be done after PREWHERE actions are applied to the header. @@ -231,16 +233,16 @@ static SortDescription getSortDescriptionForOutputHeader( column.name = original_node->result_name; } } + } - if (prewhere_info->row_level_filter) + if (row_level_filter) + { + FindOriginalNodeForOutputName original_column_finder(row_level_filter->actions); + for (auto & column : original_header) { - FindOriginalNodeForOutputName original_column_finder(*prewhere_info->row_level_filter); - for (auto & column : original_header) - { - const auto * original_node = original_column_finder.find(column.name); - if (original_node) - column.name = original_node->result_name; - } + const auto * original_node = original_column_finder.find(column.name); + if (original_node) + column.name = original_node->result_name; } } @@ -327,6 +329,7 @@ ReadFromMergeTree::ReadFromMergeTree( : SourceStepWithFilter(std::make_shared(MergeTreeSelectProcessor::transformHeader( storage_snapshot_->getSampleBlockForColumns(all_column_names_), {}, + query_info_.row_level_filter, query_info_.prewhere_info)), all_column_names_, query_info_, storage_snapshot_, context_) , reader_settings(MergeTreeReaderSettings::create(context_, query_info_)) , prepared_parts(std::move(parts_)) @@ -426,7 +429,8 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(RangesInDataParts parts_wit mutations_snapshot, shared_virtual_fields, storage_snapshot, - prewhere_info, + query_info.row_level_filter, + query_info.prewhere_info, actions_settings, reader_settings, required_columns, @@ -440,8 +444,7 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(RangesInDataParts parts_wit { auto algorithm = std::make_unique(i); - auto processor = std::make_unique( - pool, std::move(algorithm), prewhere_info, lazily_read_info, actions_settings, reader_settings); + auto processor = std::make_unique(pool, std::move(algorithm), query_info.row_level_filter, query_info.prewhere_info, lazily_read_info, actions_settings, reader_settings); auto source = std::make_shared(std::move(processor), data.getLogName()); pipes.emplace_back(std::move(source)); @@ -503,7 +506,8 @@ Pipe ReadFromMergeTree::readFromPool( mutations_snapshot, shared_virtual_fields, storage_snapshot, - prewhere_info, + query_info.row_level_filter, + query_info.prewhere_info, actions_settings, reader_settings, required_columns, @@ -518,7 +522,8 @@ Pipe ReadFromMergeTree::readFromPool( mutations_snapshot, shared_virtual_fields, storage_snapshot, - prewhere_info, + query_info.row_level_filter, + query_info.prewhere_info, actions_settings, reader_settings, required_columns, @@ -535,7 +540,7 @@ Pipe ReadFromMergeTree::readFromPool( auto algorithm = std::make_unique(i); auto processor - = std::make_unique(pool, std::move(algorithm), prewhere_info, lazily_read_info, actions_settings, reader_settings); + = std::make_unique(pool, std::move(algorithm), query_info.row_level_filter, query_info.prewhere_info, lazily_read_info, actions_settings, reader_settings); auto source = std::make_shared(std::move(processor), data.getLogName()); @@ -583,7 +588,8 @@ Pipe ReadFromMergeTree::readInOrder( shared_virtual_fields, has_limit_below_one_block, storage_snapshot, - prewhere_info, + query_info.row_level_filter, + query_info.prewhere_info, actions_settings, reader_settings, required_columns, @@ -600,7 +606,8 @@ Pipe ReadFromMergeTree::readInOrder( mutations_snapshot, shared_virtual_fields, storage_snapshot, - prewhere_info, + query_info.row_level_filter, + query_info.prewhere_info, actions_settings, reader_settings, required_columns, @@ -638,7 +645,7 @@ Pipe ReadFromMergeTree::readInOrder( algorithm = std::make_unique(i); auto processor = std::make_unique( - pool, std::move(algorithm), prewhere_info, lazily_read_info, actions_settings, reader_settings); + pool, std::move(algorithm), query_info.row_level_filter, query_info.prewhere_info, lazily_read_info, actions_settings, reader_settings); processor->addPartLevelToChunk(isQueryWithFinal()); @@ -862,6 +869,7 @@ Pipe ReadFromMergeTree::readByLayers(const RangesInDataParts & parts_with_ranges auto header = std::make_shared(MergeTreeSelectProcessor::transformHeader( storage_snapshot->getSampleBlockForColumns(in_order_column_names_to_read), lazily_read_info, + query_info.row_level_filter, query_info.prewhere_info)); pipe = Pipe(std::make_shared(header)); } @@ -958,13 +966,14 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(RangesInDataParts && parts_ = settings[Setting::merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability]; std::bernoulli_distribution fault(read_split_ranges_into_intersecting_and_non_intersecting_injection_probability); + /// TODO: Should we consider row-policy filter here? if (read_type != ReadType::ParallelReplicas && num_streams > 1 && read_split_ranges_into_intersecting_and_non_intersecting_injection_probability > 0.0 && fault(thread_local_rng) && !isQueryWithFinal() && data.merging_params.is_deleted_column.empty() && - !prewhere_info && + !query_info.prewhere_info && !lazily_read_info && !reader_settings.use_query_condition_cache && /// the query condition cache produces incorrect results with intersecting ranges !isVectorColumnReplaced()) /// Vector search optimization needs ranges & offsets to be stable @@ -1072,13 +1081,13 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder( /// To fix this, we prohibit removing any input in prewhere actions. Instead, projection actions will be added after sorting. /// See 02354_read_in_order_prewhere.sql as an example. bool have_input_columns_removed_after_prewhere = false; - if (prewhere_info) + if (query_info.prewhere_info || query_info.row_level_filter) { NameSet sorting_columns; for (const auto & column : storage_snapshot->metadata->getSortingKey().expression->getRequiredColumnsWithTypes()) sorting_columns.insert(column.name); - have_input_columns_removed_after_prewhere = restorePrewhereInputs(*prewhere_info, sorting_columns); + have_input_columns_removed_after_prewhere = restorePrewhereInputs(query_info.row_level_filter.get(), query_info.prewhere_info.get(), sorting_columns); } /// Let's split ranges to avoid reading much data. @@ -1489,12 +1498,12 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( auto sorting_expr = storage_snapshot->metadata->getSortingKey().expression; - if (prewhere_info) + if (query_info.prewhere_info || query_info.row_level_filter) { NameSet sorting_columns; for (const auto & column : storage_snapshot->metadata->getSortingKey().expression->getRequiredColumnsWithTypes()) sorting_columns.insert(column.name); - restorePrewhereInputs(*prewhere_info, sorting_columns); + restorePrewhereInputs(query_info.row_level_filter.get(), query_info.prewhere_info.get(), sorting_columns); } for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index) @@ -2098,7 +2107,8 @@ void ReadFromMergeTree::updateSortDescription() storage_snapshot->metadata->getSortingKeyReverseFlags(), getSortDirection(), query_info.input_order_info, - prewhere_info, + query_info.row_level_filter, + query_info.prewhere_info, enable_vertical_final); } @@ -2148,11 +2158,11 @@ bool ReadFromMergeTree::readsInOrder() const void ReadFromMergeTree::updatePrewhereInfo(const PrewhereInfoPtr & prewhere_info_value) { query_info.prewhere_info = prewhere_info_value; - prewhere_info = prewhere_info_value; output_header = std::make_shared(MergeTreeSelectProcessor::transformHeader( storage_snapshot->getSampleBlockForColumns(all_column_names), lazily_read_info, + query_info.row_level_filter, prewhere_info_value)); updateSortDescription(); @@ -2183,7 +2193,8 @@ void ReadFromMergeTree::updateLazilyReadInfo(const LazilyReadInfoPtr & lazily_re output_header = std::make_shared(MergeTreeSelectProcessor::transformHeader( storage_snapshot->getSampleBlockForColumns(all_column_names), lazily_read_info, - prewhere_info)); + query_info.row_level_filter, + query_info.prewhere_info)); /// if analysis has already been done (like in optimization for projections), /// then update columns to read in analysis result @@ -2200,7 +2211,8 @@ void ReadFromMergeTree::replaceVectorColumnWithDistanceColumn(const String & vec output_header = std::make_shared(MergeTreeSelectProcessor::transformHeader( storage_snapshot->getSampleBlockForColumns(all_column_names), lazily_read_info, - prewhere_info)); + query_info.row_level_filter, + query_info.prewhere_info)); /// if analysis has already been done (like in optimization for projections), /// then update columns to read in analysis result @@ -2320,8 +2332,8 @@ Pipe ReadFromMergeTree::spreadMarkRanges( sampling_columns.insert(column); } - if (prewhere_info) - restorePrewhereInputs(*prewhere_info, sampling_columns); + if (query_info.prewhere_info || query_info.row_level_filter) + restorePrewhereInputs(query_info.row_level_filter.get(), query_info.prewhere_info.get(), sampling_columns); } if (final) @@ -2620,33 +2632,38 @@ void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const format_settings.out << prefix << "Granules: " << result.index_stats.back().num_granules_after << '\n'; } - if (prewhere_info) + if (query_info.prewhere_info || query_info.row_level_filter) { format_settings.out << prefix << "Prewhere info" << '\n'; - format_settings.out << prefix << "Need filter: " << prewhere_info->need_filter << '\n'; + if (query_info.prewhere_info) + format_settings.out << prefix << "Need filter: " << query_info.prewhere_info->need_filter << '\n'; prefix.push_back(format_settings.indent_char); prefix.push_back(format_settings.indent_char); + } - { - format_settings.out << prefix << "Prewhere filter" << '\n'; - format_settings.out << prefix << "Prewhere filter column: " << prewhere_info->prewhere_column_name; - if (prewhere_info->remove_prewhere_column) - format_settings.out << " (removed)"; - format_settings.out << '\n'; + if (query_info.prewhere_info) + { + format_settings.out << prefix << "Prewhere filter" << '\n'; + format_settings.out << prefix << "Prewhere filter column: " << query_info.prewhere_info->prewhere_column_name; + if (query_info.prewhere_info->remove_prewhere_column) + format_settings.out << " (removed)"; + format_settings.out << '\n'; - auto expression = std::make_shared(prewhere_info->prewhere_actions.clone()); - expression->describeActions(format_settings.out, prefix); - } + auto expression = std::make_shared(query_info.prewhere_info->prewhere_actions.clone()); + expression->describeActions(format_settings.out, prefix); + } - if (prewhere_info->row_level_filter) - { - format_settings.out << prefix << "Row level filter" << '\n'; - format_settings.out << prefix << "Row level filter column: " << prewhere_info->row_level_column_name << '\n'; + if (query_info.row_level_filter) + { + format_settings.out << prefix << "Row level filter" << '\n'; + format_settings.out << prefix << "Row level filter column: " << query_info.row_level_filter->column_name; + if (query_info.row_level_filter->do_remove_column) + format_settings.out << " (removed)"; + format_settings.out << '\n'; - auto expression = std::make_shared(prewhere_info->row_level_filter->clone()); - expression->describeActions(format_settings.out, prefix); - } + auto expression = std::make_shared(query_info.row_level_filter->actions.clone()); + expression->describeActions(format_settings.out, prefix); } if (virtual_row_conversion) @@ -2665,34 +2682,37 @@ void ReadFromMergeTree::describeActions(JSONBuilder::JSONMap & map) const map.add("Parts", result.index_stats.back().num_parts_after); map.add("Granules", result.index_stats.back().num_granules_after); } - - if (prewhere_info) + std::unique_ptr prewhere_info_map; + if (query_info.prewhere_info || query_info.row_level_filter) { - std::unique_ptr prewhere_info_map = std::make_unique(); - prewhere_info_map->add("Need filter", prewhere_info->need_filter); + prewhere_info_map = std::make_unique(); + if (query_info.prewhere_info) + prewhere_info_map->add("Need filter", query_info.prewhere_info->need_filter); + } - { - std::unique_ptr prewhere_filter_map = std::make_unique(); - prewhere_filter_map->add("Prewhere filter column", prewhere_info->prewhere_column_name); - prewhere_filter_map->add("Prewhere filter remove filter column", prewhere_info->remove_prewhere_column); - auto expression = std::make_shared(prewhere_info->prewhere_actions.clone()); - prewhere_filter_map->add("Prewhere filter expression", expression->toTree()); + if (query_info.prewhere_info) + { + std::unique_ptr prewhere_filter_map = std::make_unique(); + prewhere_filter_map->add("Prewhere filter column", query_info.prewhere_info->prewhere_column_name); + prewhere_filter_map->add("Prewhere filter remove filter column", query_info.prewhere_info->remove_prewhere_column); + auto expression = std::make_shared(query_info.prewhere_info->prewhere_actions.clone()); + prewhere_filter_map->add("Prewhere filter expression", expression->toTree()); - prewhere_info_map->add("Prewhere filter", std::move(prewhere_filter_map)); - } + prewhere_info_map->add("Prewhere filter", std::move(prewhere_filter_map)); + } - if (prewhere_info->row_level_filter) - { - std::unique_ptr row_level_filter_map = std::make_unique(); - row_level_filter_map->add("Row level filter column", prewhere_info->row_level_column_name); - auto expression = std::make_shared(prewhere_info->row_level_filter->clone()); - row_level_filter_map->add("Row level filter expression", expression->toTree()); + if (query_info.row_level_filter) + { + std::unique_ptr row_level_filter_map = std::make_unique(); + row_level_filter_map->add("Row level filter column", query_info.row_level_filter->column_name); + auto expression = std::make_shared(query_info.row_level_filter->actions.clone()); + row_level_filter_map->add("Row level filter expression", expression->toTree()); - prewhere_info_map->add("Row level filter", std::move(row_level_filter_map)); - } + prewhere_info_map->add("Row level filter", std::move(row_level_filter_map)); + } + if (prewhere_info_map) map.add("Prewhere info", std::move(prewhere_info_map)); - } if (virtual_row_conversion) map.add("Virtual row conversions", virtual_row_conversion->toTree()); diff --git a/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp b/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp index a182b2f84034..54afa6e54b5d 100644 --- a/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp +++ b/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp @@ -74,9 +74,8 @@ void ReadFromObjectStorageStep::applyFilters(ActionDAGNodes added_filter_nodes) void ReadFromObjectStorageStep::updatePrewhereInfo(const PrewhereInfoPtr & prewhere_info_value) { - info = updateFormatPrewhereInfo(info, prewhere_info_value); + info = updateFormatPrewhereInfo(info, query_info.row_level_filter, prewhere_info_value); query_info.prewhere_info = prewhere_info_value; - prewhere_info = prewhere_info_value; output_header = std::make_shared(info.source_header); } @@ -99,9 +98,12 @@ void ReadFromObjectStorageStep::initializePipeline(QueryPipelineBuilder & pipeli auto parser_shared_resources = std::make_shared(context->getSettingsRef(), num_streams); - auto format_filter_info - = std::make_shared(filter_actions_dag, context, configuration->getColumnMapperForCurrentSchema()); - format_filter_info->prewhere_info = prewhere_info; + auto format_filter_info = std::make_shared( + filter_actions_dag, + context, + configuration->getColumnMapperForCurrentSchema(storage_snapshot->metadata, context), + query_info.row_level_filter, + query_info.prewhere_info); for (size_t i = 0; i < num_streams; ++i) { diff --git a/src/Processors/QueryPlan/SourceStepWithFilter.cpp b/src/Processors/QueryPlan/SourceStepWithFilter.cpp index 702ad28e71c2..00c67a0f4390 100644 --- a/src/Processors/QueryPlan/SourceStepWithFilter.cpp +++ b/src/Processors/QueryPlan/SourceStepWithFilter.cpp @@ -16,25 +16,26 @@ namespace ErrorCodes extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER; } -Block SourceStepWithFilter::applyPrewhereActions(Block block, const PrewhereInfoPtr & prewhere_info) +Block SourceStepWithFilter::applyPrewhereActions(Block block, const FilterDAGInfoPtr & row_level_filter, const PrewhereInfoPtr & prewhere_info) { - if (prewhere_info) + if (row_level_filter) { - if (prewhere_info->row_level_filter) + block = row_level_filter->actions.updateHeader(block); + auto & row_level_column = block.getByName(row_level_filter->column_name); + if (!row_level_column.type->canBeUsedInBooleanContext()) { - block = prewhere_info->row_level_filter->updateHeader(block); - auto & row_level_column = block.getByName(prewhere_info->row_level_column_name); - if (!row_level_column.type->canBeUsedInBooleanContext()) - { - throw Exception( - ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER, - "Invalid type for filter in PREWHERE: {}", - row_level_column.type->getName()); - } - - block.erase(prewhere_info->row_level_column_name); + throw Exception( + ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER, + "Invalid type for filter in PREWHERE: {}", + row_level_column.type->getName()); } + if (row_level_filter->do_remove_column) + block.erase(row_level_filter->column_name); + } + + if (prewhere_info) + { { block = prewhere_info->prewhere_actions.updateHeader(block); @@ -93,73 +94,81 @@ void SourceStepWithFilter::applyFilters(ActionDAGNodes added_filter_nodes) void SourceStepWithFilter::updatePrewhereInfo(const PrewhereInfoPtr & prewhere_info_value) { query_info.prewhere_info = prewhere_info_value; - prewhere_info = prewhere_info_value; - output_header = std::make_shared(applyPrewhereActions(*output_header, prewhere_info)); + output_header = std::make_shared(applyPrewhereActions(*output_header, query_info.row_level_filter, query_info.prewhere_info)); } void SourceStepWithFilter::describeActions(FormatSettings & format_settings) const { std::string prefix(format_settings.offset, format_settings.indent_char); - if (prewhere_info) + if (query_info.prewhere_info || query_info.row_level_filter) { format_settings.out << prefix << "Prewhere info" << '\n'; - format_settings.out << prefix << "Need filter: " << prewhere_info->need_filter << '\n'; + if (query_info.prewhere_info) + format_settings.out << prefix << "Need filter: " << query_info.prewhere_info->need_filter << '\n'; prefix.push_back(format_settings.indent_char); prefix.push_back(format_settings.indent_char); + } - { - format_settings.out << prefix << "Prewhere filter" << '\n'; - format_settings.out << prefix << "Prewhere filter column: " << prewhere_info->prewhere_column_name; - if (prewhere_info->remove_prewhere_column) - format_settings.out << " (removed)"; - format_settings.out << '\n'; - - auto expression = std::make_shared(prewhere_info->prewhere_actions.clone()); - expression->describeActions(format_settings.out, prefix); - } - - if (prewhere_info->row_level_filter) - { - format_settings.out << prefix << "Row level filter" << '\n'; - format_settings.out << prefix << "Row level filter column: " << prewhere_info->row_level_column_name << '\n'; + if (query_info.prewhere_info) + { + format_settings.out << prefix << "Prewhere filter" << '\n'; + format_settings.out << prefix << "Prewhere filter column: " << query_info.prewhere_info->prewhere_column_name; + if (query_info.prewhere_info->remove_prewhere_column) + format_settings.out << " (removed)"; + format_settings.out << '\n'; + + auto expression = std::make_shared(query_info.prewhere_info->prewhere_actions.clone()); + expression->describeActions(format_settings.out, prefix); + } - auto expression = std::make_shared(prewhere_info->row_level_filter->clone()); - expression->describeActions(format_settings.out, prefix); - } + if (query_info.row_level_filter) + { + format_settings.out << prefix << "Row level filter" << '\n'; + format_settings.out << prefix << "Row level filter column: " << query_info.row_level_filter->column_name; + if (query_info.row_level_filter->do_remove_column) + format_settings.out << " (removed)"; + format_settings.out << '\n'; + + auto expression = std::make_shared(query_info.row_level_filter->actions.clone()); + expression->describeActions(format_settings.out, prefix); } } void SourceStepWithFilter::describeActions(JSONBuilder::JSONMap & map) const { - if (prewhere_info) + std::unique_ptr prewhere_info_map; + if (query_info.prewhere_info || query_info.row_level_filter) { - std::unique_ptr prewhere_info_map = std::make_unique(); - prewhere_info_map->add("Need filter", prewhere_info->need_filter); + prewhere_info_map = std::make_unique(); + if (query_info.prewhere_info) + prewhere_info_map->add("Need filter", query_info.prewhere_info->need_filter); + } - { - std::unique_ptr prewhere_filter_map = std::make_unique(); - prewhere_filter_map->add("Prewhere filter column", prewhere_info->prewhere_column_name); - prewhere_filter_map->add("Prewhere filter remove filter column", prewhere_info->remove_prewhere_column); - auto expression = std::make_shared(prewhere_info->prewhere_actions.clone()); - prewhere_filter_map->add("Prewhere filter expression", expression->toTree()); + if (query_info.prewhere_info) + { + std::unique_ptr prewhere_filter_map = std::make_unique(); + prewhere_filter_map->add("Prewhere filter column", query_info.prewhere_info->prewhere_column_name); + prewhere_filter_map->add("Prewhere filter remove filter column", query_info.prewhere_info->remove_prewhere_column); + auto expression = std::make_shared(query_info.prewhere_info->prewhere_actions.clone()); + prewhere_filter_map->add("Prewhere filter expression", expression->toTree()); - prewhere_info_map->add("Prewhere filter", std::move(prewhere_filter_map)); - } + prewhere_info_map->add("Prewhere filter", std::move(prewhere_filter_map)); + } - if (prewhere_info->row_level_filter) - { - std::unique_ptr row_level_filter_map = std::make_unique(); - row_level_filter_map->add("Row level filter column", prewhere_info->row_level_column_name); - auto expression = std::make_shared(prewhere_info->row_level_filter->clone()); - row_level_filter_map->add("Row level filter expression", expression->toTree()); + if (query_info.row_level_filter) + { + std::unique_ptr row_level_filter_map = std::make_unique(); + row_level_filter_map->add("Row level filter column", query_info.row_level_filter->column_name); + auto expression = std::make_shared(query_info.row_level_filter->actions.clone()); + row_level_filter_map->add("Row level filter expression", expression->toTree()); - prewhere_info_map->add("Row level filter", std::move(row_level_filter_map)); - } + prewhere_info_map->add("Row level filter", std::move(row_level_filter_map)); + } + if (prewhere_info_map) map.add("Prewhere info", std::move(prewhere_info_map)); - } } } diff --git a/src/Processors/QueryPlan/SourceStepWithFilter.h b/src/Processors/QueryPlan/SourceStepWithFilter.h index f92af5245494..c5bd3dab9077 100644 --- a/src/Processors/QueryPlan/SourceStepWithFilter.h +++ b/src/Processors/QueryPlan/SourceStepWithFilter.h @@ -62,6 +62,7 @@ class SourceStepWithFilterBase : public ISourceStep } virtual void applyFilters(ActionDAGNodes added_filter_nodes); + virtual FilterDAGInfoPtr getRowLevelFilter() const { return nullptr; } virtual PrewhereInfoPtr getPrewhereInfo() const { return nullptr; } const std::shared_ptr & getFilterActionsDAG() const { return filter_actions_dag; } @@ -103,7 +104,6 @@ class SourceStepWithFilter : public SourceStepWithFilterBase : SourceStepWithFilterBase(std::move(output_header_)) , required_source_columns(column_names_) , query_info(query_info_) - , prewhere_info(query_info.prewhere_info) , storage_snapshot(storage_snapshot_) , context(context_) { @@ -112,7 +112,8 @@ class SourceStepWithFilter : public SourceStepWithFilterBase SourceStepWithFilter(const SourceStepWithFilter &) = default; const SelectQueryInfo & getQueryInfo() const { return query_info; } - PrewhereInfoPtr getPrewhereInfo() const override { return prewhere_info; } + FilterDAGInfoPtr getRowLevelFilter() const override { return query_info.row_level_filter; } + PrewhereInfoPtr getPrewhereInfo() const override { return query_info.prewhere_info; } ContextPtr getContext() const { return context; } const StorageSnapshotPtr & getStorageSnapshot() const { return storage_snapshot; } @@ -128,12 +129,11 @@ class SourceStepWithFilter : public SourceStepWithFilterBase void describeActions(JSONBuilder::JSONMap & map) const override; - static Block applyPrewhereActions(Block block, const PrewhereInfoPtr & prewhere_info); + static Block applyPrewhereActions(Block block, const FilterDAGInfoPtr & row_level_filter, const PrewhereInfoPtr & prewhere_info); protected: Names required_source_columns; SelectQueryInfo query_info; - PrewhereInfoPtr prewhere_info; StorageSnapshotPtr storage_snapshot; ContextPtr context; }; diff --git a/src/Storages/IStorage.cpp b/src/Storages/IStorage.cpp index 215c832624c1..cb96f682606a 100644 --- a/src/Storages/IStorage.cpp +++ b/src/Storages/IStorage.cpp @@ -410,11 +410,6 @@ std::string PrewhereInfo::dump() const WriteBufferFromOwnString ss; ss << "PrewhereDagInfo\n"; - if (row_level_filter) - { - ss << "row_level_filter " << row_level_filter->dumpDAG() << "\n"; - } - { ss << "prewhere_actions " << prewhere_actions.dumpDAG() << "\n"; } diff --git a/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp b/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp index b767f5b623c8..602c968b642e 100644 --- a/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp +++ b/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp @@ -348,6 +348,7 @@ MergeTreeReadTaskColumns getReadTaskColumns( const IMergeTreeDataPartInfoForReader & data_part_info_for_reader, const StorageSnapshotPtr & storage_snapshot, const Names & required_columns, + const FilterDAGInfoPtr & row_level_filter, const PrewhereInfoPtr & prewhere_info, const PrewhereExprSteps & mutation_steps, const ExpressionActionsSettings & actions_settings, @@ -421,9 +422,10 @@ MergeTreeReadTaskColumns getReadTaskColumns( for (const auto & step : mutation_steps) add_step(*step); - if (prewhere_info) + if (prewhere_info || row_level_filter) { auto prewhere_actions = MergeTreeSelectProcessor::getPrewhereActions( + row_level_filter, prewhere_info, actions_settings, reader_settings.enable_multiple_prewhere_read_steps, reader_settings.force_short_circuit_execution); @@ -454,6 +456,7 @@ MergeTreeReadTaskColumns getReadTaskColumnsForMerge( data_part_info_for_reader, storage_snapshot, required_columns, + /*row_level_filter=*/ nullptr, /*prewhere_info=*/ nullptr, mutation_steps, /*actions_settings=*/ {}, diff --git a/src/Storages/MergeTree/MergeTreeBlockReadUtils.h b/src/Storages/MergeTree/MergeTreeBlockReadUtils.h index 6bf95b605a5a..9745069d454d 100644 --- a/src/Storages/MergeTree/MergeTreeBlockReadUtils.h +++ b/src/Storages/MergeTree/MergeTreeBlockReadUtils.h @@ -33,6 +33,7 @@ MergeTreeReadTaskColumns getReadTaskColumns( const IMergeTreeDataPartInfoForReader & data_part_info_for_reader, const StorageSnapshotPtr & storage_snapshot, const Names & required_columns, + const FilterDAGInfoPtr & row_level_filter, const PrewhereInfoPtr & prewhere_info, const PrewhereExprSteps & mutation_steps, const ExpressionActionsSettings & actions_settings, diff --git a/src/Storages/MergeTree/MergeTreePrefetchedReadPool.cpp b/src/Storages/MergeTree/MergeTreePrefetchedReadPool.cpp index af3e1aabaa36..2dd137676c7a 100644 --- a/src/Storages/MergeTree/MergeTreePrefetchedReadPool.cpp +++ b/src/Storages/MergeTree/MergeTreePrefetchedReadPool.cpp @@ -106,6 +106,7 @@ MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool( MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, + const FilterDAGInfoPtr & row_level_filter_, const PrewhereInfoPtr & prewhere_info_, const ExpressionActionsSettings & actions_settings_, const MergeTreeReaderSettings & reader_settings_, @@ -118,6 +119,7 @@ MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool( std::move(mutations_snapshot_), std::move(shared_virtual_fields_), storage_snapshot_, + row_level_filter_, prewhere_info_, actions_settings_, reader_settings_, diff --git a/src/Storages/MergeTree/MergeTreePrefetchedReadPool.h b/src/Storages/MergeTree/MergeTreePrefetchedReadPool.h index 2c6687002dc0..d07681a00f00 100644 --- a/src/Storages/MergeTree/MergeTreePrefetchedReadPool.h +++ b/src/Storages/MergeTree/MergeTreePrefetchedReadPool.h @@ -22,6 +22,7 @@ class MergeTreePrefetchedReadPool : public MergeTreeReadPoolBase MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, + const FilterDAGInfoPtr & row_level_filter_, const PrewhereInfoPtr & prewhere_info_, const ExpressionActionsSettings & actions_settings_, const MergeTreeReaderSettings & reader_settings_, diff --git a/src/Storages/MergeTree/MergeTreeReadPool.cpp b/src/Storages/MergeTree/MergeTreeReadPool.cpp index 9fb56a1e9381..db173e064f97 100644 --- a/src/Storages/MergeTree/MergeTreeReadPool.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPool.cpp @@ -40,6 +40,7 @@ MergeTreeReadPool::MergeTreeReadPool( MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, + const FilterDAGInfoPtr & row_level_filter_, const PrewhereInfoPtr & prewhere_info_, const ExpressionActionsSettings & actions_settings_, const MergeTreeReaderSettings & reader_settings_, @@ -52,6 +53,7 @@ MergeTreeReadPool::MergeTreeReadPool( std::move(mutations_snapshot_), std::move(shared_virtual_fields_), storage_snapshot_, + row_level_filter_, prewhere_info_, actions_settings_, reader_settings_, diff --git a/src/Storages/MergeTree/MergeTreeReadPool.h b/src/Storages/MergeTree/MergeTreeReadPool.h index 1b55284c592b..8891158d2c14 100644 --- a/src/Storages/MergeTree/MergeTreeReadPool.h +++ b/src/Storages/MergeTree/MergeTreeReadPool.h @@ -29,6 +29,7 @@ class MergeTreeReadPool : public MergeTreeReadPoolBase MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, + const FilterDAGInfoPtr & row_level_filter_, const PrewhereInfoPtr & prewhere_info_, const ExpressionActionsSettings & actions_settings_, const MergeTreeReaderSettings & reader_settings_, diff --git a/src/Storages/MergeTree/MergeTreeReadPoolBase.cpp b/src/Storages/MergeTree/MergeTreeReadPoolBase.cpp index e2f615574ff6..3bbbe618e9dc 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolBase.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPoolBase.cpp @@ -29,6 +29,7 @@ MergeTreeReadPoolBase::MergeTreeReadPoolBase( MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, + const FilterDAGInfoPtr & row_level_filter_, const PrewhereInfoPtr & prewhere_info_, const ExpressionActionsSettings & actions_settings_, const MergeTreeReaderSettings & reader_settings_, @@ -41,6 +42,7 @@ MergeTreeReadPoolBase::MergeTreeReadPoolBase( , mutations_snapshot(std::move(mutations_snapshot_)) , shared_virtual_fields(std::move(shared_virtual_fields_)) , storage_snapshot(storage_snapshot_) + , row_level_filter(row_level_filter_) , prewhere_info(prewhere_info_) , actions_settings(actions_settings_) , reader_settings(reader_settings_) @@ -188,6 +190,7 @@ void MergeTreeReadPoolBase::fillPerPartInfos(const Settings & settings) part_info, storage_snapshot, column_names, + row_level_filter, prewhere_info, read_task_info.mutation_steps, actions_settings, diff --git a/src/Storages/MergeTree/MergeTreeReadPoolBase.h b/src/Storages/MergeTree/MergeTreeReadPoolBase.h index eb4314642619..eae6df8b5e78 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolBase.h +++ b/src/Storages/MergeTree/MergeTreeReadPoolBase.h @@ -32,6 +32,7 @@ class MergeTreeReadPoolBase : public IMergeTreeReadPool, protected WithContext MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, + const FilterDAGInfoPtr & row_level_filter_, const PrewhereInfoPtr & prewhere_info_, const ExpressionActionsSettings & actions_settings_, const MergeTreeReaderSettings & reader_settings_, @@ -48,6 +49,7 @@ class MergeTreeReadPoolBase : public IMergeTreeReadPool, protected WithContext const MutationsSnapshotPtr mutations_snapshot; const VirtualFields shared_virtual_fields; const StorageSnapshotPtr storage_snapshot; + const FilterDAGInfoPtr row_level_filter; const PrewhereInfoPtr prewhere_info; const ExpressionActionsSettings actions_settings; const MergeTreeReaderSettings reader_settings; diff --git a/src/Storages/MergeTree/MergeTreeReadPoolInOrder.cpp b/src/Storages/MergeTree/MergeTreeReadPoolInOrder.cpp index c4244ecd9820..1aa6b5eedc19 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolInOrder.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPoolInOrder.cpp @@ -15,6 +15,7 @@ MergeTreeReadPoolInOrder::MergeTreeReadPoolInOrder( MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, + const FilterDAGInfoPtr & row_level_filter_, const PrewhereInfoPtr & prewhere_info_, const ExpressionActionsSettings & actions_settings_, const MergeTreeReaderSettings & reader_settings_, @@ -27,6 +28,7 @@ MergeTreeReadPoolInOrder::MergeTreeReadPoolInOrder( std::move(mutations_snapshot_), std::move(shared_virtual_fields_), storage_snapshot_, + row_level_filter_, prewhere_info_, actions_settings_, reader_settings_, diff --git a/src/Storages/MergeTree/MergeTreeReadPoolInOrder.h b/src/Storages/MergeTree/MergeTreeReadPoolInOrder.h index 41f3ab1061c1..35c968acb28c 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolInOrder.h +++ b/src/Storages/MergeTree/MergeTreeReadPoolInOrder.h @@ -14,6 +14,7 @@ class MergeTreeReadPoolInOrder : public MergeTreeReadPoolBase MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, + const FilterDAGInfoPtr & row_level_filter_, const PrewhereInfoPtr & prewhere_info_, const ExpressionActionsSettings & actions_settings_, const MergeTreeReaderSettings & reader_settings_, diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp index c3e5d609b8ff..20af6def427e 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp @@ -112,6 +112,7 @@ MergeTreeReadPoolParallelReplicas::MergeTreeReadPoolParallelReplicas( MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, + const FilterDAGInfoPtr & row_level_filter_, const PrewhereInfoPtr & prewhere_info_, const ExpressionActionsSettings & actions_settings_, const MergeTreeReaderSettings & reader_settings_, @@ -124,6 +125,7 @@ MergeTreeReadPoolParallelReplicas::MergeTreeReadPoolParallelReplicas( std::move(mutations_snapshot_), std::move(shared_virtual_fields_), storage_snapshot_, + row_level_filter_, prewhere_info_, actions_settings_, reader_settings_, diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h index 63816340eb1d..b5fbd4efc1ba 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h @@ -14,6 +14,7 @@ class MergeTreeReadPoolParallelReplicas : public MergeTreeReadPoolBase MutationsSnapshotPtr mutations_snapshot_, VirtualFields shared_virtual_fields_, const StorageSnapshotPtr & storage_snapshot_, + const FilterDAGInfoPtr & row_level_filter_, const PrewhereInfoPtr & prewhere_info_, const ExpressionActionsSettings & actions_settings_, const MergeTreeReaderSettings & reader_settings_, diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp index cdc84ff6c0eb..6ff17f078e29 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp @@ -28,6 +28,7 @@ MergeTreeReadPoolParallelReplicasInOrder::MergeTreeReadPoolParallelReplicasInOrd VirtualFields shared_virtual_fields_, bool has_limit_below_one_block_, const StorageSnapshotPtr & storage_snapshot_, + const FilterDAGInfoPtr & row_level_filter_, const PrewhereInfoPtr & prewhere_info_, const ExpressionActionsSettings & actions_settings_, const MergeTreeReaderSettings & reader_settings_, @@ -40,6 +41,7 @@ MergeTreeReadPoolParallelReplicasInOrder::MergeTreeReadPoolParallelReplicasInOrd std::move(mutations_snapshot_), std::move(shared_virtual_fields_), storage_snapshot_, + row_level_filter_, prewhere_info_, actions_settings_, reader_settings_, diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h index e7fad180657b..c8fb1f2c89ee 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h @@ -16,6 +16,7 @@ class MergeTreeReadPoolParallelReplicasInOrder : public MergeTreeReadPoolBase VirtualFields shared_virtual_fields_, bool has_limit_below_one_block_, const StorageSnapshotPtr & storage_snapshot_, + const FilterDAGInfoPtr & row_level_filter_, const PrewhereInfoPtr & prewhere_info_, const ExpressionActionsSettings & actions_settings_, const MergeTreeReaderSettings & reader_settings_, diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp index 642a0f4ad4e2..a351719b2c1d 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp @@ -92,22 +92,25 @@ std::optional ParallelReadingExtension::sendReadRequest( MergeTreeSelectProcessor::MergeTreeSelectProcessor( MergeTreeReadPoolPtr pool_, MergeTreeSelectAlgorithmPtr algorithm_, + const FilterDAGInfoPtr & row_level_filter_, const PrewhereInfoPtr & prewhere_info_, const LazilyReadInfoPtr & lazily_read_info_, const ExpressionActionsSettings & actions_settings_, const MergeTreeReaderSettings & reader_settings_) : pool(std::move(pool_)) , algorithm(std::move(algorithm_)) + , row_level_filter(row_level_filter_) , prewhere_info(prewhere_info_) , actions_settings(actions_settings_) , prewhere_actions(getPrewhereActions( + row_level_filter, prewhere_info, actions_settings, reader_settings_.enable_multiple_prewhere_read_steps, reader_settings_.force_short_circuit_execution)) , lazily_read_info(lazily_read_info_) , reader_settings(reader_settings_) - , result_header(transformHeader(pool->getHeader(), lazily_read_info, prewhere_info)) + , result_header(transformHeader(pool->getHeader(), lazily_read_info, row_level_filter, prewhere_info)) { bool has_prewhere_actions_steps = !prewhere_actions.steps.empty(); if (has_prewhere_actions_steps) @@ -127,43 +130,46 @@ String MergeTreeSelectProcessor::getName() const bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, PrewhereExprInfo & prewhere, bool force_short_circuit_execution); -PrewhereExprInfo MergeTreeSelectProcessor::getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, bool enable_multiple_prewhere_read_steps, bool force_short_circuit_execution) +PrewhereExprInfo MergeTreeSelectProcessor::getPrewhereActions( + const FilterDAGInfoPtr & row_level_filter, + const PrewhereInfoPtr & prewhere_info, + const ExpressionActionsSettings & actions_settings, + bool enable_multiple_prewhere_read_steps, + bool force_short_circuit_execution) { PrewhereExprInfo prewhere_actions; - if (prewhere_info) + + if (row_level_filter) { - if (prewhere_info->row_level_filter) + PrewhereExprStep row_level_filter_step { - PrewhereExprStep row_level_filter_step - { - .type = PrewhereExprStep::Filter, - .actions = std::make_shared(prewhere_info->row_level_filter->clone(), actions_settings), - .filter_column_name = prewhere_info->row_level_column_name, - .remove_filter_column = true, - .need_filter = true, - .perform_alter_conversions = true, - .mutation_version = std::nullopt, - }; - - prewhere_actions.steps.emplace_back(std::make_shared(std::move(row_level_filter_step))); - } + .type = PrewhereExprStep::Filter, + .actions = std::make_shared(row_level_filter->actions.clone(), actions_settings), + .filter_column_name = row_level_filter->column_name, + .remove_filter_column = row_level_filter->do_remove_column, + .need_filter = true, + .perform_alter_conversions = true, + .mutation_version = std::nullopt, + }; + + prewhere_actions.steps.emplace_back(std::make_shared(std::move(row_level_filter_step))); + } - if (!enable_multiple_prewhere_read_steps || - !tryBuildPrewhereSteps(prewhere_info, actions_settings, prewhere_actions, force_short_circuit_execution)) + if (!enable_multiple_prewhere_read_steps || + !tryBuildPrewhereSteps(prewhere_info, actions_settings, prewhere_actions, force_short_circuit_execution)) + { + PrewhereExprStep prewhere_step { - PrewhereExprStep prewhere_step - { - .type = PrewhereExprStep::Filter, - .actions = std::make_shared(prewhere_info->prewhere_actions.clone(), actions_settings), - .filter_column_name = prewhere_info->prewhere_column_name, - .remove_filter_column = prewhere_info->remove_prewhere_column, - .need_filter = prewhere_info->need_filter, - .perform_alter_conversions = true, - .mutation_version = std::nullopt, - }; - - prewhere_actions.steps.emplace_back(std::make_shared(std::move(prewhere_step))); - } + .type = PrewhereExprStep::Filter, + .actions = std::make_shared(prewhere_info->prewhere_actions.clone(), actions_settings), + .filter_column_name = prewhere_info->prewhere_column_name, + .remove_filter_column = prewhere_info->remove_prewhere_column, + .need_filter = prewhere_info->need_filter, + .perform_alter_conversions = true, + .mutation_version = std::nullopt, + }; + + prewhere_actions.steps.emplace_back(std::make_shared(std::move(prewhere_step))); } return prewhere_actions; @@ -329,9 +335,10 @@ void MergeTreeSelectProcessor::injectLazilyReadColumns( Block MergeTreeSelectProcessor::transformHeader( Block block, const LazilyReadInfoPtr & lazily_read_info, + const FilterDAGInfoPtr & row_level_filter, const PrewhereInfoPtr & prewhere_info) { - auto transformed = SourceStepWithFilter::applyPrewhereActions(std::move(block), prewhere_info); + auto transformed = SourceStepWithFilter::applyPrewhereActions(std::move(block), row_level_filter, prewhere_info); injectLazilyReadColumns(0, transformed, -1, lazily_read_info); return transformed; } diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.h b/src/Storages/MergeTree/MergeTreeSelectProcessor.h index 7c898c91e36e..de04d99f9c74 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.h @@ -63,6 +63,7 @@ class MergeTreeSelectProcessor : private boost::noncopyable MergeTreeSelectProcessor( MergeTreeReadPoolPtr pool_, MergeTreeSelectAlgorithmPtr algorithm_, + const FilterDAGInfoPtr & row_level_filter_, const PrewhereInfoPtr & prewhere_info_, const LazilyReadInfoPtr & lazily_read_info_, const ExpressionActionsSettings & actions_settings_, @@ -73,6 +74,7 @@ class MergeTreeSelectProcessor : private boost::noncopyable static Block transformHeader( Block block, const LazilyReadInfoPtr & lazily_read_info, + const FilterDAGInfoPtr & row_level_filter, const PrewhereInfoPtr & prewhere_info); Block getHeader() const { return result_header; } @@ -84,7 +86,8 @@ class MergeTreeSelectProcessor : private boost::noncopyable const MergeTreeReaderSettings & getSettings() const { return reader_settings; } static PrewhereExprInfo getPrewhereActions( - PrewhereInfoPtr prewhere_info, + const FilterDAGInfoPtr & row_level_filter, + const PrewhereInfoPtr & prewhere_info, const ExpressionActionsSettings & actions_settings, bool enable_multiple_prewhere_read_steps, bool force_short_circuit_execution); @@ -106,6 +109,7 @@ class MergeTreeSelectProcessor : private boost::noncopyable const MergeTreeReadPoolPtr pool; const MergeTreeSelectAlgorithmPtr algorithm; + const FilterDAGInfoPtr row_level_filter; const PrewhereInfoPtr prewhere_info; const ExpressionActionsSettings actions_settings; const PrewhereExprInfo prewhere_actions; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp index 0076b1d52ddf..256915b49829 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp @@ -247,7 +247,7 @@ void writeDataFiles( 8192, format_settings, parser_shared_resources, - std::make_shared(nullptr, context, nullptr), + std::make_shared(nullptr, context, nullptr, nullptr, nullptr), true /* is_remote_fs */, chooseCompressionMethod(data_file->data_object_info->getPath(), configuration->compression_method), false); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp index c7ef9a654903..7dfbee6f1672 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp @@ -150,7 +150,7 @@ std::optional writeDataFiles( field_ids[IcebergPositionDeleteTransform::positions_column_name] = IcebergPositionDeleteTransform::positions_column_field_id; field_ids[IcebergPositionDeleteTransform::data_file_path_column_name] = IcebergPositionDeleteTransform::data_file_path_column_field_id; column_mapper->setStorageColumnEncoding(std::move(field_ids)); - FormatFilterInfoPtr format_filter_info = std::make_shared(nullptr, context, column_mapper); + FormatFilterInfoPtr format_filter_info = std::make_shared(nullptr, context, column_mapper, nullptr, nullptr); auto output_format = FormatFactory::instance().getOutputFormat( configuration->format, *write_buffer, delete_file_sample_block, context, format_settings, format_filter_info); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp index 1599d93433bb..1f5a10a27dba 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp @@ -111,7 +111,7 @@ void IcebergPositionDeleteTransform::initializeDeleteSources() context->getSettingsRef()[DB::Setting::max_block_size], format_settings, std::make_shared(context->getSettingsRef(), 1), - std::make_shared(actions_dag_ptr, context, nullptr), + std::make_shared(actions_dag_ptr, context, nullptr, nullptr, nullptr), true /* is_remote_fs */, compression_method); diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 0d333b9a9713..11765fc01ad2 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -370,10 +370,10 @@ void StorageObjectStorage::read( /*supports_tuple_elements=*/ supports_prewhere, local_context, PrepareReadingFromFormatHiveParams { file_columns, hive_partition_columns_to_read_from_file_path.getNameToTypeMap() }); - if (query_info.prewhere_info) - read_from_format_info = updateFormatPrewhereInfo(read_from_format_info, query_info.prewhere_info); + if (query_info.prewhere_info || query_info.row_level_filter) + read_from_format_info = updateFormatPrewhereInfo(read_from_format_info, query_info.row_level_filter, query_info.prewhere_info); - const bool need_only_count = (query_info.optimize_trivial_count || (read_from_format_info.requested_columns.empty() && !read_from_format_info.prewhere_info)) + const bool need_only_count = (query_info.optimize_trivial_count || (read_from_format_info.requested_columns.empty() && !read_from_format_info.prewhere_info && !read_from_format_info.row_level_filter)) && local_context->getSettingsRef()[Setting::optimize_count_from_files]; auto modified_format_settings{format_settings}; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index db47a7fc7945..7a66fc212818 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -547,7 +547,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade auto mapper = configuration->getColumnMapperForObject(object_info); if (!mapper) return format_filter_info; - return std::make_shared(format_filter_info->filter_actions_dag, format_filter_info->context.lock(), mapper); + return std::make_shared(format_filter_info->filter_actions_dag, format_filter_info->context.lock(), mapper, nullptr, nullptr); }(); auto input_format = FormatFactory::instance().getInput( diff --git a/src/Storages/SelectQueryInfo.cpp b/src/Storages/SelectQueryInfo.cpp index b08b511e2701..b83afc2d5f6a 100644 --- a/src/Storages/SelectQueryInfo.cpp +++ b/src/Storages/SelectQueryInfo.cpp @@ -39,51 +39,53 @@ std::unordered_map SelectQueryInfo::buildNod return node_name_to_input_node_column; } -PrewhereInfoPtr PrewhereInfo::clone() const +PrewhereInfo PrewhereInfo::clone() const { - PrewhereInfoPtr prewhere_info = std::make_shared(); + PrewhereInfo prewhere_info; - if (row_level_filter) - prewhere_info->row_level_filter = row_level_filter->clone(); - - prewhere_info->prewhere_actions = prewhere_actions.clone(); - - prewhere_info->row_level_column_name = row_level_column_name; - prewhere_info->prewhere_column_name = prewhere_column_name; - prewhere_info->remove_prewhere_column = remove_prewhere_column; - prewhere_info->need_filter = need_filter; - prewhere_info->generated_by_optimizer = generated_by_optimizer; + prewhere_info.prewhere_actions = prewhere_actions.clone(); + prewhere_info.prewhere_column_name = prewhere_column_name; + prewhere_info.remove_prewhere_column = remove_prewhere_column; + prewhere_info.need_filter = need_filter; return prewhere_info; } void PrewhereInfo::serialize(IQueryPlanStep::Serialization & ctx) const { - writeBinary(row_level_filter.has_value(), ctx.out); - if (row_level_filter.has_value()) - row_level_filter->serialize(ctx.out, ctx.registry); prewhere_actions.serialize(ctx.out, ctx.registry); - writeStringBinary(row_level_column_name, ctx.out); writeStringBinary(prewhere_column_name, ctx.out); writeBinary(remove_prewhere_column, ctx.out); - writeBinary(need_filter, ctx.out); - writeBinary(generated_by_optimizer, ctx.out); } -PrewhereInfoPtr PrewhereInfo::deserialize(IQueryPlanStep::Deserialization & ctx) +PrewhereInfo PrewhereInfo::deserialize(IQueryPlanStep::Deserialization & ctx) { - PrewhereInfoPtr result = std::make_shared(); - bool has_row_level_filter; - readBinary(has_row_level_filter, ctx.in); - if (has_row_level_filter) - result->row_level_filter = ActionsDAG::deserialize(ctx.in, ctx.registry, ctx.context); - result->prewhere_actions = ActionsDAG::deserialize(ctx.in, ctx.registry, ctx.context); - readStringBinary(result->row_level_column_name, ctx.in); - readStringBinary(result->prewhere_column_name, ctx.in); - readBinary(result->remove_prewhere_column, ctx.in); - readBinary(result->need_filter, ctx.in); - readBinary(result->generated_by_optimizer, ctx.in); - return result; + PrewhereInfo prewhere_info; + + prewhere_info.prewhere_actions = ActionsDAG::deserialize(ctx.in, ctx.registry, ctx.context); + readStringBinary(prewhere_info.prewhere_column_name, ctx.in); + readBinary(prewhere_info.remove_prewhere_column, ctx.in); + prewhere_info.need_filter = true; + + return prewhere_info; +} + +void FilterDAGInfo::serialize(IQueryPlanStep::Serialization & ctx) const +{ + actions.serialize(ctx.out, ctx.registry); + writeStringBinary(column_name, ctx.out); + writeBinary(do_remove_column, ctx.out); +} + +FilterDAGInfo FilterDAGInfo::deserialize(IQueryPlanStep::Deserialization & ctx) +{ + FilterDAGInfo filter_dag_info; + + filter_dag_info.actions = ActionsDAG::deserialize(ctx.in, ctx.registry, ctx.context); + readStringBinary(filter_dag_info.column_name, ctx.in); + readBinary(filter_dag_info.do_remove_column, ctx.in); + + return filter_dag_info; } } diff --git a/src/Storages/SelectQueryInfo.h b/src/Storages/SelectQueryInfo.h index 77448e4c1ee1..2252424e259e 100644 --- a/src/Storages/SelectQueryInfo.h +++ b/src/Storages/SelectQueryInfo.h @@ -42,16 +42,11 @@ using PreparedSetsPtr = std::shared_ptr; struct PrewhereInfo { - /// Actions for row level security filter. Applied separately before prewhere_actions. - /// This actions are separate because prewhere condition should not be executed over filtered rows. - std::optional row_level_filter; /// Actions which are executed on block in order to get filter column for prewhere step. ActionsDAG prewhere_actions; - String row_level_column_name; String prewhere_column_name; bool remove_prewhere_column = false; bool need_filter = false; - bool generated_by_optimizer = false; PrewhereInfo() = default; explicit PrewhereInfo(ActionsDAG prewhere_actions_, String prewhere_column_name_) @@ -59,10 +54,10 @@ struct PrewhereInfo std::string dump() const; - PrewhereInfoPtr clone() const; + PrewhereInfo clone() const; void serialize(IQueryPlanStep::Serialization & ctx) const; - static PrewhereInfoPtr deserialize(IQueryPlanStep::Deserialization & ctx); + static PrewhereInfo deserialize(IQueryPlanStep::Deserialization & ctx); }; /// Same as FilterInfo, but with ActionsDAG. @@ -73,6 +68,9 @@ struct FilterDAGInfo bool do_remove_column = false; std::string dump() const; + + void serialize(IQueryPlanStep::Serialization & ctx) const; + static FilterDAGInfo deserialize(IQueryPlanStep::Deserialization & ctx); }; struct InputOrderInfo @@ -190,6 +188,10 @@ struct SelectQueryInfo bool has_window = false; bool has_order_by = false; bool need_aggregate = false; + + /// Actions for row level security filter. Applied separately before prewhere. + /// This actions are separate because prewhere condition should not be executed over filtered rows. + FilterDAGInfoPtr row_level_filter; PrewhereInfoPtr prewhere_info; /// If query has aggregate functions diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index 21a5e11f52df..4b8cbc4bbfba 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -354,27 +354,35 @@ void StorageBuffer::read( else { auto src_table_query_info = query_info; - if (src_table_query_info.prewhere_info) + ActionsDAG converting_dag; + if (src_table_query_info.prewhere_info || src_table_query_info.row_level_filter) { - src_table_query_info.prewhere_info = src_table_query_info.prewhere_info->clone(); + converting_dag = ActionsDAG::makeConvertingActions( + header_after_adding_defaults.getColumnsWithTypeAndName(), + header.getColumnsWithTypeAndName(), + ActionsDAG::MatchColumnsMode::Name); + } - auto actions_dag = ActionsDAG::makeConvertingActions( - header_after_adding_defaults.getColumnsWithTypeAndName(), - header.getColumnsWithTypeAndName(), - ActionsDAG::MatchColumnsMode::Name); + if (src_table_query_info.row_level_filter) + { + auto row_level_filter = std::make_shared(); + row_level_filter->column_name = src_table_query_info.row_level_filter->column_name; + row_level_filter->do_remove_column = src_table_query_info.row_level_filter->do_remove_column; - if (src_table_query_info.prewhere_info->row_level_filter) - { - src_table_query_info.prewhere_info->row_level_filter = ActionsDAG::merge( - actions_dag.clone(), - std::move(*src_table_query_info.prewhere_info->row_level_filter)); + row_level_filter->actions = ActionsDAG::merge( + converting_dag.clone(), + src_table_query_info.row_level_filter->actions.clone()); - src_table_query_info.prewhere_info->row_level_filter->removeUnusedActions(); - } + row_level_filter->actions.removeUnusedActions(); + src_table_query_info.row_level_filter = std::move(row_level_filter); + } + if (src_table_query_info.prewhere_info) + { + src_table_query_info.prewhere_info = std::make_shared(src_table_query_info.prewhere_info->clone()); { src_table_query_info.prewhere_info->prewhere_actions = ActionsDAG::merge( - actions_dag.clone(), + converting_dag.clone(), std::move(src_table_query_info.prewhere_info->prewhere_actions)); src_table_query_info.prewhere_info->prewhere_actions.removeUnusedActions(); @@ -479,23 +487,23 @@ void StorageBuffer::read( } else { - if (query_info.prewhere_info) + if (query_info.row_level_filter) { ExpressionActionsSettings actions_settings(local_context); - - if (query_info.prewhere_info->row_level_filter) + auto actions = std::make_shared(query_info.row_level_filter->actions.clone(), actions_settings); + pipe_from_buffers.addSimpleTransform([&](const SharedHeader & header) { - auto actions = std::make_shared(query_info.prewhere_info->row_level_filter->clone(), actions_settings); - pipe_from_buffers.addSimpleTransform([&](const SharedHeader & header) - { - return std::make_shared( - header, - actions, - query_info.prewhere_info->row_level_column_name, - false); - }); - } + return std::make_shared( + header, + actions, + query_info.row_level_filter->column_name, + query_info.row_level_filter->do_remove_column); + }); + } + if (query_info.prewhere_info) + { + ExpressionActionsSettings actions_settings(local_context); auto actions = std::make_shared(query_info.prewhere_info->prewhere_actions.clone(), actions_settings); pipe_from_buffers.addSimpleTransform([&](const SharedHeader & header) { diff --git a/src/Storages/StorageDummy.cpp b/src/Storages/StorageDummy.cpp index 95878e69d3b1..dc98e4e4a0ff 100644 --- a/src/Storages/StorageDummy.cpp +++ b/src/Storages/StorageDummy.cpp @@ -57,7 +57,7 @@ ReadFromDummy::ReadFromDummy( const ContextPtr & context_, const StorageDummy & storage_) : SourceStepWithFilter(std::make_shared(SourceStepWithFilter::applyPrewhereActions( - storage_snapshot_->getSampleBlockForColumns(column_names_), query_info_.prewhere_info)), + storage_snapshot_->getSampleBlockForColumns(column_names_), query_info_.row_level_filter, query_info_.prewhere_info)), column_names_, query_info_, storage_snapshot_, diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index 917433d6c237..57b807903e84 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -1640,9 +1640,8 @@ void ReadFromFile::applyFilters(ActionDAGNodes added_filter_nodes) void ReadFromFile::updatePrewhereInfo(const PrewhereInfoPtr & prewhere_info_value) { - info = updateFormatPrewhereInfo(info, prewhere_info_value); + info = updateFormatPrewhereInfo(info, query_info.row_level_filter, prewhere_info_value); query_info.prewhere_info = prewhere_info_value; - prewhere_info = prewhere_info_value; output_header = std::make_shared(info.source_header); } @@ -1691,9 +1690,9 @@ void StorageFile::read( PrepareReadingFromFormatHiveParams {file_columns, hive_partition_columns_to_read_from_file_path.getNameToTypeMap()}); if (query_info.prewhere_info) - read_from_format_info = updateFormatPrewhereInfo(read_from_format_info, query_info.prewhere_info); + read_from_format_info = updateFormatPrewhereInfo(read_from_format_info, query_info.row_level_filter, query_info.prewhere_info); - bool need_only_count = (query_info.optimize_trivial_count || (read_from_format_info.requested_columns.empty() && !read_from_format_info.prewhere_info)) + bool need_only_count = (query_info.optimize_trivial_count || (read_from_format_info.requested_columns.empty() && !read_from_format_info.prewhere_info && !read_from_format_info.row_level_filter)) && context->getSettingsRef()[Setting::optimize_count_from_files]; auto reading = std::make_unique( @@ -1752,8 +1751,7 @@ void ReadFromFile::initializePipeline(QueryPipelineBuilder & pipeline, const Bui progress_callback(FileProgress(0, storage->total_bytes_to_read)); auto parser_shared_resources = std::make_shared(ctx->getSettingsRef(), num_streams); - auto format_filter_info = std::make_shared(filter_actions_dag, ctx, nullptr); - format_filter_info->prewhere_info = prewhere_info; + auto format_filter_info = std::make_shared(filter_actions_dag, ctx, nullptr, query_info.row_level_filter, query_info.prewhere_info); for (size_t i = 0; i < num_streams; ++i) { diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 275f34ef781b..83e6b5566c63 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -1168,9 +1168,8 @@ void ReadFromURL::applyFilters(ActionDAGNodes added_filter_nodes) void ReadFromURL::updatePrewhereInfo(const PrewhereInfoPtr & prewhere_info_value) { - info = updateFormatPrewhereInfo(info, prewhere_info_value); + info = updateFormatPrewhereInfo(info, query_info.row_level_filter, prewhere_info_value); query_info.prewhere_info = prewhere_info_value; - prewhere_info = prewhere_info_value; output_header = std::make_shared(info.source_header); } @@ -1193,10 +1192,10 @@ void IStorageURLBase::read( /*supports_tuple_elements=*/ supports_prewhere, PrepareReadingFromFormatHiveParams {file_columns, hive_partition_columns_to_read_from_file_path.getNameToTypeMap()}); - if (query_info.prewhere_info) - read_from_format_info = updateFormatPrewhereInfo(read_from_format_info, query_info.prewhere_info); + if (query_info.prewhere_info || query_info.row_level_filter) + read_from_format_info = updateFormatPrewhereInfo(read_from_format_info, query_info.row_level_filter, query_info.prewhere_info); - bool need_only_count = (query_info.optimize_trivial_count || (read_from_format_info.requested_columns.empty() && !read_from_format_info.prewhere_info)) + bool need_only_count = (query_info.optimize_trivial_count || (read_from_format_info.requested_columns.empty() && !read_from_format_info.prewhere_info && !read_from_format_info.row_level_filter)) && local_context->getSettingsRef()[Setting::optimize_count_from_files]; auto read_post_data_callback = getReadPOSTDataCallback( @@ -1309,8 +1308,7 @@ void ReadFromURL::initializePipeline(QueryPipelineBuilder & pipeline, const Buil pipes.reserve(num_streams); auto parser_shared_resources = std::make_shared(settings, num_streams); - auto format_filter_info = std::make_shared(filter_actions_dag, context, nullptr); - format_filter_info->prewhere_info = prewhere_info; + auto format_filter_info = std::make_shared(filter_actions_dag, context, nullptr, query_info.row_level_filter, query_info.prewhere_info); for (size_t i = 0; i < num_streams; ++i) { @@ -1374,10 +1372,10 @@ void StorageURLWithFailover::read( /*supports_tuple_elements=*/ supports_prewhere, PrepareReadingFromFormatHiveParams {file_columns, hive_partition_columns_to_read_from_file_path.getNameToTypeMap()}); - if (query_info.prewhere_info) - read_from_format_info = updateFormatPrewhereInfo(read_from_format_info, query_info.prewhere_info); + if (query_info.prewhere_info || query_info.row_level_filter) + read_from_format_info = updateFormatPrewhereInfo(read_from_format_info, query_info.row_level_filter, query_info.prewhere_info); - bool need_only_count = (query_info.optimize_trivial_count || (read_from_format_info.requested_columns.empty() && !read_from_format_info.prewhere_info)) + bool need_only_count = (query_info.optimize_trivial_count || (read_from_format_info.requested_columns.empty() && !read_from_format_info.prewhere_info && !read_from_format_info.row_level_filter)) && local_context->getSettingsRef()[Setting::optimize_count_from_files]; auto read_post_data_callback = getReadPOSTDataCallback( diff --git a/src/Storages/prepareReadingFromFormat.cpp b/src/Storages/prepareReadingFromFormat.cpp index 3b31a9b0d2ae..9d24a5563124 100644 --- a/src/Storages/prepareReadingFromFormat.cpp +++ b/src/Storages/prepareReadingFromFormat.cpp @@ -241,11 +241,11 @@ ReadFromFormatInfo prepareReadingFromFormat( return info; } -ReadFromFormatInfo updateFormatPrewhereInfo(const ReadFromFormatInfo & info, const PrewhereInfoPtr & prewhere_info) +ReadFromFormatInfo updateFormatPrewhereInfo(const ReadFromFormatInfo & info, const FilterDAGInfoPtr & row_level_filter, const PrewhereInfoPtr & prewhere_info) { chassert(prewhere_info); - if (info.prewhere_info) + if (info.prewhere_info || info.row_level_filter) throw Exception(ErrorCodes::LOGICAL_ERROR, "updateFormatPrewhereInfo called more than once"); ReadFromFormatInfo new_info; @@ -253,7 +253,7 @@ ReadFromFormatInfo updateFormatPrewhereInfo(const ReadFromFormatInfo & info, con /// Removes columns that are only used as prewhere input. /// Adds prewhere result column if !remove_prewhere_column. - new_info.format_header = SourceStepWithFilter::applyPrewhereActions(info.format_header, prewhere_info); + new_info.format_header = SourceStepWithFilter::applyPrewhereActions(info.format_header, row_level_filter, prewhere_info); /// We assume that any format that supports prewhere also supports subset of subcolumns, so we /// don't need to replace subcolumns with their nested columns etc. @@ -362,7 +362,7 @@ ReadFromFormatInfo ReadFromFormatInfo::deserialize(IQueryPlanStep::Deserializati bool has_prewhere_info; readBinary(has_prewhere_info, ctx.in); if (has_prewhere_info) - result.prewhere_info = PrewhereInfo::deserialize(ctx); + result.prewhere_info = std::make_shared(PrewhereInfo::deserialize(ctx)); ctx.in >> "\n"; diff --git a/src/Storages/prepareReadingFromFormat.h b/src/Storages/prepareReadingFromFormat.h index 27951f1da8dd..19f766e45ddf 100644 --- a/src/Storages/prepareReadingFromFormat.h +++ b/src/Storages/prepareReadingFromFormat.h @@ -10,6 +10,9 @@ namespace DB struct PrewhereInfo; using PrewhereInfoPtr = std::shared_ptr; + struct FilterDAGInfo; + using FilterDAGInfoPtr = std::shared_ptr; + struct ReadFromFormatInfo { /// Header that will return Source from storage. @@ -40,6 +43,7 @@ namespace DB /// The list of hive partition columns. It shall be read from the path regardless if it is present in the file NamesAndTypesList hive_partition_columns_to_read_from_file_path; PrewhereInfoPtr prewhere_info; + FilterDAGInfoPtr row_level_filter; }; struct PrepareReadingFromFormatHiveParams @@ -70,7 +74,7 @@ namespace DB bool supports_tuple_elements = false, const PrepareReadingFromFormatHiveParams & hive_parameters = {}); - ReadFromFormatInfo updateFormatPrewhereInfo(const ReadFromFormatInfo & info, const PrewhereInfoPtr & prewhere_info); + ReadFromFormatInfo updateFormatPrewhereInfo(const ReadFromFormatInfo & info, const FilterDAGInfoPtr & row_level_filter, const PrewhereInfoPtr & prewhere_info); /// Returns the serialization hints from the insertion table (if it's set in the Context). SerializationInfoByName getSerializationHintsForFileLikeStorage(const StorageMetadataPtr & metadata_snapshot, const ContextPtr & context); diff --git a/tests/queries/0_stateless/02679_explain_merge_tree_prewhere_row_policy.reference b/tests/queries/0_stateless/02679_explain_merge_tree_prewhere_row_policy.reference index 4a4e338438b2..669d0fe248d3 100644 --- a/tests/queries/0_stateless/02679_explain_merge_tree_prewhere_row_policy.reference +++ b/tests/queries/0_stateless/02679_explain_merge_tree_prewhere_row_policy.reference @@ -19,7 +19,7 @@ Positions: 0 1 FUNCTION equals(id : 0, 5 :: 1) -> equals(id, 5) UInt8 : 2 Positions: 2 0 Row level filter - Row level filter column: greaterOrEquals(id, 5) + Row level filter column: greaterOrEquals(id, 5) (removed) Actions: INPUT : 0 -> id UInt64 : 0 COLUMN Const(UInt8) -> 5 UInt8 : 1 FUNCTION greaterOrEquals(id : 0, 5 :: 1) -> greaterOrEquals(id, 5) UInt8 : 2 @@ -49,7 +49,7 @@ Positions: 1 2 FUNCTION equals(id : 0, 5_UInt8 :: 1) -> equals(id, 5_UInt8) UInt8 : 2 Positions: 2 0 Row level filter - Row level filter column: greaterOrEquals(id, 5_UInt8) + Row level filter column: greaterOrEquals(id, 5_UInt8) (removed) Actions: INPUT : 0 -> id UInt64 : 0 COLUMN Const(UInt8) -> 5_UInt8 UInt8 : 1 FUNCTION greaterOrEquals(id : 0, 5_UInt8 :: 1) -> greaterOrEquals(id, 5_UInt8) UInt8 : 2 diff --git a/tests/queries/0_stateless/03591_optimize_prewhere_row_policy.reference b/tests/queries/0_stateless/03591_optimize_prewhere_row_policy.reference new file mode 100644 index 000000000000..ec5934037622 --- /dev/null +++ b/tests/queries/0_stateless/03591_optimize_prewhere_row_policy.reference @@ -0,0 +1,144 @@ +-- {echoOn} + +SET use_query_condition_cache = 0; +SET enable_parallel_replicas = 0; +DROP TABLE IF EXISTS 03591_test; +DROP ROW POLICY IF EXISTS 03591_rp ON 03591_test; +CREATE TABLE 03591_test (a Int32, b Int32) ENGINE=MergeTree ORDER BY tuple(); +INSERT INTO 03591_test VALUES (3, 1), (2, 2), (3, 2); +SELECT * FROM 03591_test; +3 1 +2 2 +3 2 +SELECT * FROM 03591_test WHERE throwIf(b=1, 'Should throw') SETTINGS optimize_move_to_prewhere = 1; -- {serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO} +CREATE ROW POLICY 03591_rp ON 03591_test USING b=2 TO CURRENT_USER; +SELECT * FROM 03591_test; +2 2 +3 2 +-- Print plan with actions to make sure both a > 0 and b=2 are present in the prewhere section +EXPLAIN PLAN actions=1 SELECT * FROM 03591_test WHERE a > 0 SETTINGS optimize_move_to_prewhere = 1, allow_experimental_analyzer = 1; +Expression ((Project names + Projection)) +Actions: INPUT : 0 -> __table1.b Int32 : 0 + INPUT : 1 -> __table1.a Int32 : 1 + ALIAS __table1.b :: 0 -> b Int32 : 2 + ALIAS __table1.a :: 1 -> a Int32 : 0 +Positions: 0 2 + Expression ((WHERE + Change column names to column identifiers)) + Actions: INPUT : 0 -> b Int32 : 0 + INPUT : 1 -> a Int32 : 1 + ALIAS b :: 0 -> __table1.b Int32 : 2 + ALIAS a :: 1 -> __table1.a Int32 : 0 + Positions: 2 0 + ReadFromMergeTree (default.03591_test) + ReadType: Default + Parts: 1 + Granules: 1 + Prewhere info + Need filter: 1 + Prewhere filter + Prewhere filter column: greater(__table1.a, 0_UInt8) (removed) + Actions: INPUT : 0 -> a Int32 : 0 + COLUMN Const(UInt8) -> 0_UInt8 UInt8 : 1 + FUNCTION greater(a : 0, 0_UInt8 :: 1) -> greater(__table1.a, 0_UInt8) UInt8 : 2 + Positions: 0 2 + Row level filter + Row level filter column: equals(b, 2_UInt8) (removed) + Actions: INPUT : 0 -> b Int32 : 0 + COLUMN Const(UInt8) -> 2_UInt8 UInt8 : 1 + FUNCTION equals(b : 0, 2_UInt8 :: 1) -> equals(b, 2_UInt8) UInt8 : 2 + Positions: 2 0 +EXPLAIN PLAN actions=1 SELECT * FROM 03591_test WHERE a > 0 SETTINGS optimize_move_to_prewhere = 1, allow_experimental_analyzer = 0; +Expression ((Projection + Before ORDER BY)) +Actions: INPUT :: 0 -> a Int32 : 0 + INPUT :: 1 -> b Int32 : 1 +Positions: 0 1 + Expression (WHERE) + Actions: INPUT :: 0 -> a Int32 : 0 + INPUT :: 1 -> b Int32 : 1 + Positions: 0 1 + ReadFromMergeTree (default.03591_test) + ReadType: Default + Parts: 1 + Granules: 1 + Prewhere info + Need filter: 1 + Prewhere filter + Prewhere filter column: greater(a, 0) (removed) + Actions: INPUT : 0 -> a Int32 : 0 + COLUMN Const(UInt8) -> 0 UInt8 : 1 + FUNCTION greater(a : 0, 0 :: 1) -> greater(a, 0) UInt8 : 2 + Positions: 0 2 + Row level filter + Row level filter column: equals(b, 2) (removed) + Actions: INPUT : 0 -> b Int32 : 0 + COLUMN Const(UInt8) -> 2 UInt8 : 1 + FUNCTION equals(b : 0, 2 :: 1) -> equals(b, 2) UInt8 : 2 + Positions: 2 0 +SELECT * FROM 03591_test WHERE throwIf(b=1, 'Should not throw because b=1 is not visible to this user due to the b=2 row policy') SETTINGS optimize_move_to_prewhere = 1; +-- Print plan with actions to make sure a > 0, b = 2 and a = 3 are present in the prewhere section +EXPLAIN PLAN actions=1 SELECT * FROM 03591_test WHERE a > 0 SETTINGS optimize_move_to_prewhere = 1, additional_table_filters={'03591_test': 'a=3'}, allow_experimental_analyzer = 1; +Expression ((Project names + Projection)) +Actions: INPUT : 0 -> __table1.a Int32 : 0 + INPUT : 1 -> __table1.b Int32 : 1 + ALIAS __table1.a :: 0 -> a Int32 : 2 + ALIAS __table1.b :: 1 -> b Int32 : 0 +Positions: 2 0 + Expression (((WHERE + Change column names to column identifiers) + additional filter)) + Actions: INPUT : 1 -> b Int32 : 0 + INPUT : 0 -> a Int32 : 1 + ALIAS b :: 0 -> __table1.b Int32 : 2 + ALIAS a :: 1 -> __table1.a Int32 : 0 + Positions: 0 2 + ReadFromMergeTree (default.03591_test) + ReadType: Default + Parts: 1 + Granules: 1 + Prewhere info + Need filter: 1 + Prewhere filter + Prewhere filter column: and(equals(a, 3_UInt8), greater(__table1.a, 0_UInt8)) (removed) + Actions: INPUT : 0 -> a Int32 : 0 + COLUMN Const(UInt8) -> 3_UInt8 UInt8 : 1 + COLUMN Const(UInt8) -> 0_UInt8 UInt8 : 2 + FUNCTION equals(a : 0, 3_UInt8 :: 1) -> equals(a, 3_UInt8) UInt8 : 3 + FUNCTION greater(a : 0, 0_UInt8 :: 2) -> greater(__table1.a, 0_UInt8) UInt8 : 1 + FUNCTION and(equals(a, 3_UInt8) :: 3, greater(__table1.a, 0_UInt8) :: 1) -> and(equals(a, 3_UInt8), greater(__table1.a, 0_UInt8)) UInt8 : 2 + Positions: 0 2 + Row level filter + Row level filter column: equals(b, 2_UInt8) (removed) + Actions: INPUT : 0 -> b Int32 : 0 + COLUMN Const(UInt8) -> 2_UInt8 UInt8 : 1 + FUNCTION equals(b : 0, 2_UInt8 :: 1) -> equals(b, 2_UInt8) UInt8 : 2 + Positions: 2 0 +EXPLAIN PLAN actions=1 SELECT * FROM 03591_test WHERE a > 0 SETTINGS optimize_move_to_prewhere = 1, additional_table_filters={'03591_test': 'a=3'}, allow_experimental_analyzer = 0; +Expression ((Projection + Before ORDER BY)) +Actions: INPUT :: 0 -> a Int32 : 0 + INPUT :: 1 -> b Int32 : 1 +Positions: 0 1 + Expression ((WHERE + Additional filter)) + Actions: INPUT :: 0 -> a Int32 : 0 + INPUT :: 1 -> b Int32 : 1 + Positions: 0 1 + ReadFromMergeTree (default.03591_test) + ReadType: Default + Parts: 1 + Granules: 1 + Prewhere info + Need filter: 1 + Prewhere filter + Prewhere filter column: and(equals(a, 3), greater(a, 0)) (removed) + Actions: INPUT : 0 -> a Int32 : 0 + COLUMN Const(UInt8) -> 3 UInt8 : 1 + COLUMN Const(UInt8) -> 0 UInt8 : 2 + FUNCTION equals(a : 0, 3 :: 1) -> equals(a, 3) UInt8 : 3 + FUNCTION greater(a : 0, 0 :: 2) -> greater(a, 0) UInt8 : 1 + FUNCTION and(equals(a, 3) :: 3, greater(a, 0) :: 1) -> and(equals(a, 3), greater(a, 0)) UInt8 : 2 + Positions: 0 2 + Row level filter + Row level filter column: equals(b, 2) (removed) + Actions: INPUT : 0 -> b Int32 : 0 + COLUMN Const(UInt8) -> 2 UInt8 : 1 + FUNCTION equals(b : 0, 2 :: 1) -> equals(b, 2) UInt8 : 2 + Positions: 2 0 +DROP ROW POLICY 03591_rp ON 03591_test; +SELECT * FROM 03591_test WHERE throwIf(b=2, 'Should throw') SETTINGS optimize_move_to_prewhere = 1; -- {serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO} diff --git a/tests/queries/0_stateless/03591_optimize_prewhere_row_policy.sql b/tests/queries/0_stateless/03591_optimize_prewhere_row_policy.sql new file mode 100644 index 000000000000..b010fadff97b --- /dev/null +++ b/tests/queries/0_stateless/03591_optimize_prewhere_row_policy.sql @@ -0,0 +1,34 @@ +-- {echoOn} + +SET use_query_condition_cache = 0; +SET enable_parallel_replicas = 0; + +DROP TABLE IF EXISTS 03591_test; + +DROP ROW POLICY IF EXISTS 03591_rp ON 03591_test; + +CREATE TABLE 03591_test (a Int32, b Int32) ENGINE=MergeTree ORDER BY tuple(); + +INSERT INTO 03591_test VALUES (3, 1), (2, 2), (3, 2); + +SELECT * FROM 03591_test; + +SELECT * FROM 03591_test WHERE throwIf(b=1, 'Should throw') SETTINGS optimize_move_to_prewhere = 1; -- {serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO} + +CREATE ROW POLICY 03591_rp ON 03591_test USING b=2 TO CURRENT_USER; + +SELECT * FROM 03591_test; + +-- Print plan with actions to make sure both a > 0 and b=2 are present in the prewhere section +EXPLAIN PLAN actions=1 SELECT * FROM 03591_test WHERE a > 0 SETTINGS optimize_move_to_prewhere = 1, allow_experimental_analyzer = 1; +EXPLAIN PLAN actions=1 SELECT * FROM 03591_test WHERE a > 0 SETTINGS optimize_move_to_prewhere = 1, allow_experimental_analyzer = 0; + +SELECT * FROM 03591_test WHERE throwIf(b=1, 'Should not throw because b=1 is not visible to this user due to the b=2 row policy') SETTINGS optimize_move_to_prewhere = 1; + +-- Print plan with actions to make sure a > 0, b = 2 and a = 3 are present in the prewhere section +EXPLAIN PLAN actions=1 SELECT * FROM 03591_test WHERE a > 0 SETTINGS optimize_move_to_prewhere = 1, additional_table_filters={'03591_test': 'a=3'}, allow_experimental_analyzer = 1; +EXPLAIN PLAN actions=1 SELECT * FROM 03591_test WHERE a > 0 SETTINGS optimize_move_to_prewhere = 1, additional_table_filters={'03591_test': 'a=3'}, allow_experimental_analyzer = 0; + +DROP ROW POLICY 03591_rp ON 03591_test; + +SELECT * FROM 03591_test WHERE throwIf(b=2, 'Should throw') SETTINGS optimize_move_to_prewhere = 1; -- {serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO} From af22847dc408dca512a607fceb981ba0061d2ace Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Sat, 27 Dec 2025 00:20:09 +0100 Subject: [PATCH 2/2] fix build --- src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp b/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp index 54afa6e54b5d..69b8202166ac 100644 --- a/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp +++ b/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp @@ -101,7 +101,7 @@ void ReadFromObjectStorageStep::initializePipeline(QueryPipelineBuilder & pipeli auto format_filter_info = std::make_shared( filter_actions_dag, context, - configuration->getColumnMapperForCurrentSchema(storage_snapshot->metadata, context), + configuration->getColumnMapperForCurrentSchema(), query_info.row_level_filter, query_info.prewhere_info);