diff --git a/crates/hotblocks/src/api.rs b/crates/hotblocks/src/api.rs index b187e17..7c3d73b 100644 --- a/crates/hotblocks/src/api.rs +++ b/crates/hotblocks/src/api.rs @@ -50,6 +50,7 @@ pub fn build_api(app: App) -> Router { Router::new() .route("/", get(|| async { "Welcome to SQD hot block data service!" })) .route("/datasets/{id}/stream", post(stream)) + .route("/datasets/{id}/finalized-stream", post(finalized_stream)) .route("/datasets/{id}/head", get(get_head)) .route("/datasets/{id}/finalized-head", get(get_finalized_head)) .route("/datasets/{id}/retention", get(get_retention).post(set_retention)) @@ -66,6 +67,27 @@ async fn stream( Path(dataset_id): Path, Json(query): Json ) -> Response +{ + stream_internal(app, dataset_id, query, false).await +} + + +async fn finalized_stream( + Extension(app): Extension, + Path(dataset_id): Path, + Json(query): Json +) -> Response +{ + stream_internal(app, dataset_id, query, true).await +} + + +async fn stream_internal( + app: AppRef, + dataset_id: DatasetId, + query: Query, + finalized: bool +) -> Response { let dataset = get_dataset!(app, dataset_id); @@ -73,18 +95,29 @@ async fn stream( return text!(StatusCode::BAD_REQUEST, "{}", err) } - match app.query_service.query(&dataset, query).await { + let query_result = if finalized { + app.query_service.query_finalized(&dataset, query).await + } else { + app.query_service.query(&dataset, query).await + }; + + match query_result { Ok(stream) => { let mut res = Response::builder() .status(200) .header("content-type", "text/plain") .header("content-encoding", "gzip"); - if let Some(head) = stream.finalized_head() { - let head_block = head.number.max(dataset.get_head_block_number().unwrap_or(0)); - res = res.header("x-sqd-head-number", head_block); - res = res.header("x-sqd-finalized-head-number", head.number); - res = res.header("x-sqd-finalized-head-hash", head.hash.as_str()); + if let Some(finalized_head) = stream.finalized_head() { + if finalized { + // For finalized stream, use the finalized head as the head + res = res.header("x-sqd-head-number", finalized_head.number); + } else { + let head_block = finalized_head.number.max(dataset.get_head_block_number().unwrap_or(0)); + res = res.header("x-sqd-head-number", head_block); + } + res = res.header("x-sqd-finalized-head-number", finalized_head.number); + res = res.header("x-sqd-finalized-head-hash", finalized_head.hash.as_str()); } else if let Some(head_block) = dataset.get_head_block_number() { res = res.header("x-sqd-head-number", head_block); } diff --git a/crates/hotblocks/src/dataset_controller/dataset_controller.rs b/crates/hotblocks/src/dataset_controller/dataset_controller.rs index f145b4d..6a89a27 100644 --- a/crates/hotblocks/src/dataset_controller/dataset_controller.rs +++ b/crates/hotblocks/src/dataset_controller/dataset_controller.rs @@ -135,6 +135,18 @@ impl DatasetController { recv.changed().await.unwrap() } } + + pub async fn wait_for_finalized_block(&self, block_number: BlockNumber) -> BlockNumber { + let mut recv = self.finalized_head_receiver.clone(); + loop { + if let Some(block) = recv.borrow_and_update().as_ref() { + if block.number >= block_number { + return block.number + } + } + recv.changed().await.unwrap() + } + } } diff --git a/crates/hotblocks/src/query/response.rs b/crates/hotblocks/src/query/response.rs index fef1ff2..a0568bb 100644 --- a/crates/hotblocks/src/query/response.rs +++ b/crates/hotblocks/src/query/response.rs @@ -24,6 +24,7 @@ impl QueryResponse { db: DBRef, dataset_id: DatasetId, query: Query, + only_finalized: bool, ) -> anyhow::Result { let Some(slot) = executor.get_slot() else { @@ -33,7 +34,7 @@ impl QueryResponse { let start = Instant::now(); let mut runner = slot.run(move |slot| -> anyhow::Result<_> { - let mut runner = RunningQuery::new(db, dataset_id, &query).map(Box::new)?; + let mut runner = RunningQuery::new(db, dataset_id, &query, only_finalized).map(Box::new)?; next_run(&mut runner, slot)?; Ok(runner) }).await?; diff --git a/crates/hotblocks/src/query/running.rs b/crates/hotblocks/src/query/running.rs index 7d60b5e..87cd0f4 100644 --- a/crates/hotblocks/src/query/running.rs +++ b/crates/hotblocks/src/query/running.rs @@ -33,7 +33,8 @@ impl RunningQuery { pub fn new( db: DBRef, dataset_id: DatasetId, - query: &Query + query: &Query, + only_finalized: bool, ) -> anyhow::Result { let snapshot = StaticSnapshot::new(db); @@ -95,9 +96,24 @@ impl RunningQuery { query.compile() }; + let last_block = if only_finalized { + // Cap the query's last_block to the finalized head + if let Some(finalized_head) = &finalized_head { + let capped_last = query + .last_block() + .map(|end| end.min(finalized_head.number)) + .or(Some(finalized_head.number)); + capped_last + } else { + anyhow::bail!("Finalized head is not available yet"); + } + } else { + query.last_block() + }; + Ok(Self { plan, - last_block: query.last_block(), + last_block, left_over: None, next_chunk: Some(Ok(first_chunk)), chunk_iterator, diff --git a/crates/hotblocks/src/query/service.rs b/crates/hotblocks/src/query/service.rs index 29e52bf..c6bf9ae 100644 --- a/crates/hotblocks/src/query/service.rs +++ b/crates/hotblocks/src/query/service.rs @@ -77,7 +77,28 @@ impl QueryService { QueryServiceBuilder::new(db) } - pub async fn query(&self, dataset: &DatasetController, query: Query) -> anyhow::Result { + pub async fn query( + &self, + dataset: &DatasetController, + query: Query, + ) -> anyhow::Result { + self.query_internal(dataset, query, false).await + } + + pub async fn query_finalized( + &self, + dataset: &DatasetController, + query: Query, + ) -> anyhow::Result { + self.query_internal(dataset, query, true).await + } + + async fn query_internal( + &self, + dataset: &DatasetController, + query: Query, + finalized: bool, + ) -> anyhow::Result { ensure!( dataset.dataset_kind() == DatasetKind::from_query(&query), QueryKindMismatch { @@ -86,7 +107,13 @@ impl QueryService { } ); - let should_wait = match dataset.get_head() { + let target_head = if finalized { + dataset.get_finalized_head() + } else { + dataset.get_head() + }; + + let should_wait = match target_head { Some(head) if head.number >= query.first_block() => false, Some(head) if head.number + 1 == query.first_block() => { if let Some(parent_hash) = query.parent_block_hash() { @@ -107,13 +134,16 @@ impl QueryService { let Some(_wait_slot) = self.wait_slots.get() else { bail!(Busy) }; - tokio::time::timeout( - Duration::from_secs(5), - dataset.wait_for_block(query.first_block()) - ).await.map_err(|_| { - QueryIsAboveTheHead { - finalized_head: None + tokio::time::timeout(Duration::from_secs(5), async { + if finalized { + dataset.wait_for_finalized_block(query.first_block()).await + } else { + dataset.wait_for_block(query.first_block()).await } + }) + .await + .map_err(|_| QueryIsAboveTheHead { + finalized_head: None })?; } @@ -121,7 +151,8 @@ impl QueryService { self.executor.clone(), self.db.clone(), dataset.dataset_id(), - query + query, + finalized, ).await } }