Skip to content

Commit 29422a9

Browse files
authored
Add config to fail if search targets too many splits (#6009)
1 parent 63e7edf commit 29422a9

File tree

3 files changed

+69
-0
lines changed

3 files changed

+69
-0
lines changed

quickwit/quickwit-config/src/node_config/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@ pub struct SearcherConfig {
269269
pub partial_request_cache_capacity: ByteSize,
270270
pub predicate_cache_capacity: ByteSize,
271271
pub max_num_concurrent_split_searches: usize,
272+
pub max_splits_per_search: Option<usize>,
272273
// Deprecated: stream search requests are no longer supported.
273274
#[serde(alias = "max_num_concurrent_split_streams", default, skip_serializing)]
274275
pub _max_num_concurrent_split_streams: Option<serde::de::IgnoredAny>,
@@ -327,6 +328,7 @@ impl Default for SearcherConfig {
327328
partial_request_cache_capacity: ByteSize::mb(64),
328329
predicate_cache_capacity: ByteSize::mb(256),
329330
max_num_concurrent_split_searches: 100,
331+
max_splits_per_search: None,
330332
_max_num_concurrent_split_streams: None,
331333
aggregation_memory_limit: ByteSize::mb(500),
332334
aggregation_bucket_limit: 65000,

quickwit/quickwit-config/src/node_config/serialize.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -663,6 +663,7 @@ mod tests {
663663
partial_request_cache_capacity: ByteSize::mb(64),
664664
predicate_cache_capacity: ByteSize::mb(256),
665665
max_num_concurrent_split_searches: 150,
666+
max_splits_per_search: None,
666667
_max_num_concurrent_split_streams: Some(serde::de::IgnoredAny),
667668
split_cache: None,
668669
request_timeout_secs: NonZeroU64::new(30).unwrap(),

quickwit/quickwit-search/src/root.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1230,6 +1230,21 @@ pub async fn root_search(
12301230
"root_search"
12311231
);
12321232

1233+
if let Some(max_total_split_searches) = searcher_context.searcher_config.max_splits_per_search
1234+
&& max_total_split_searches < num_splits
1235+
{
1236+
tracing::error!(
1237+
num_splits,
1238+
max_total_split_searches,
1239+
index=?search_request.index_id_patterns,
1240+
query=%search_request.query_ast,
1241+
"max total splits exceeded"
1242+
);
1243+
return Err(SearchError::InvalidArgument(format!(
1244+
"Number of targeted splits {num_splits} exceeds the limit {max_total_split_searches}"
1245+
)));
1246+
}
1247+
12331248
let mut search_response_result = RootSearchMetricsFuture {
12341249
start: start_instant,
12351250
tracked: root_search_aux(
@@ -5235,4 +5250,55 @@ mod tests {
52355250
assert_eq!(search_response.failed_splits.len(), 1);
52365251
Ok(())
52375252
}
5253+
5254+
#[tokio::test]
5255+
async fn test_root_search_too_many_splits() -> anyhow::Result<()> {
5256+
let search_request = quickwit_proto::search::SearchRequest {
5257+
index_id_patterns: vec!["test-index".to_string()],
5258+
query_ast: qast_json_helper("test", &["body"]),
5259+
max_hits: 10,
5260+
..Default::default()
5261+
};
5262+
let mut mock_metastore = MockMetastoreService::new();
5263+
let index_metadata = IndexMetadata::for_test("test-index", "ram:///test-index");
5264+
let index_uid = index_metadata.index_uid.clone();
5265+
mock_metastore
5266+
.expect_list_indexes_metadata()
5267+
.returning(move |_index_ids_query| {
5268+
Ok(ListIndexesMetadataResponse::for_test(vec![
5269+
index_metadata.clone(),
5270+
]))
5271+
});
5272+
mock_metastore
5273+
.expect_list_splits()
5274+
.returning(move |_filter| {
5275+
let splits = vec![
5276+
MockSplitBuilder::new("split1")
5277+
.with_index_uid(&index_uid)
5278+
.build(),
5279+
MockSplitBuilder::new("split2")
5280+
.with_index_uid(&index_uid)
5281+
.build(),
5282+
];
5283+
let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap();
5284+
Ok(ServiceStream::from(vec![Ok(splits_response)]))
5285+
});
5286+
let mock_search_service = MockSearchService::new();
5287+
let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", mock_search_service)]);
5288+
let search_job_placer = SearchJobPlacer::new(searcher_pool);
5289+
let cluster_client = ClusterClient::new(search_job_placer.clone());
5290+
5291+
let mut searcher_context = SearcherContext::for_test();
5292+
searcher_context.searcher_config.max_splits_per_search = Some(1);
5293+
let search_error = root_search(
5294+
&searcher_context,
5295+
search_request,
5296+
MetastoreServiceClient::from_mock(mock_metastore),
5297+
&cluster_client,
5298+
)
5299+
.await
5300+
.unwrap_err();
5301+
assert!(matches!(search_error, SearchError::InvalidArgument { .. }));
5302+
Ok(())
5303+
}
52385304
}

0 commit comments

Comments
 (0)