From 15be462dab016e0e865f2226c359d6c9bf1166d3 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Tue, 16 Dec 2025 09:06:42 +0100 Subject: [PATCH 1/5] Ensure all-ephemeral transactions don't consume a tx offset Views are materialized in mutable transactions, but should not increment the transaction offset maintaintained in the committed state. This fixes storing completely empty transactions in the commitlog, and maintains that the committed state tx offset is in-sync with the commitlog's tx offset. --- crates/core/src/db/relational_db.rs | 44 ++++++++++++++++--- .../locking_tx_datastore/committed_state.rs | 7 ++- crates/datastore/src/traits.rs | 36 +++++++++++---- 3 files changed, 70 insertions(+), 17 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 99f57806247..bf881ecefac 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -818,12 +818,7 @@ impl RelationalDB { Txdata, }; - let is_not_ephemeral_table = |table_id: &TableId| -> bool { - tx_data - .ephemeral_tables() - .map(|etables| !etables.contains(table_id)) - .unwrap_or(true) - }; + let is_not_ephemeral_table = |table_id: &TableId| -> bool { !tx_data.is_ephemeral_table(table_id) }; if tx_data.tx_offset().is_some() { let inserts: Box<_> = tx_data @@ -849,10 +844,15 @@ impl RelationalDB { .filter(|ops| !truncates.contains(&ops.table_id)) .collect(); - let truncates = truncates.into_iter().filter(is_not_ephemeral_table).collect(); + let truncates: Box<_> = truncates.into_iter().filter(is_not_ephemeral_table).collect(); let inputs = reducer_context.map(|rcx| rcx.into()); + debug_assert!( + !(inserts.is_empty() && truncates.is_empty() && deletes.is_empty() && inputs.is_none()), + "empty transaction" + ); + let txdata = Txdata { inputs, outputs: None, @@ -2632,6 +2632,36 @@ mod tests { Ok(()) } + #[test] + fn test_view_materialization_does_not_consume_tx_offset() -> ResultTest<()> { + let stdb = TestDB::durable_without_snapshot_repo()?; + + let (tx_offset_1, view_id, table_id) = { + let module_def = view_module_def(); + let view_def = module_def.view("my_view").unwrap(); + + let mut tx = begin_mut_tx(&stdb); + let (view_id, table_id) = stdb.create_view(&mut tx, &module_def, view_def)?; + let (tx_offset, tx_data, ..) = stdb.commit_tx(tx)?.unwrap(); + assert_eq!(Some(tx_offset), tx_data.tx_offset()); + + (tx_offset, view_id, table_id) + }; + + let mut tx = begin_mut_tx(&stdb); + tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ONE)?; + stdb.materialize_view(&mut tx, table_id, Identity::ONE, vec![product![10u8]])?; + let (tx_offset_2, tx_data, ..) = stdb.commit_tx(tx)?.unwrap(); + + // `tx_data.tx_offset()` should return `None`, + // so that it is not considered for durability. + // The tx offset reported for confirmed reads should stay the same. + assert!(tx_data.tx_offset().is_none()); + assert_eq!(tx_offset_1, tx_offset_2); + + Ok(()) + } + #[test] fn test_view_tables_are_ephemeral_with_snapshot() -> ResultTest<()> { let stdb = TestDB::durable()?; diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 41e5615c18e..c5a3a9d1a8b 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -747,6 +747,11 @@ impl CommittedState { // so that we can pass updated set of table ids. self.merge_read_sets(read_sets); + // Store in `tx_data` which of the updated tables are ephemeral. + // NOTE: This must be called before `tx_consumes_offset`, so that + // all-ephemeral transactions do not consume a tx offset. + tx_data.set_ephemeral_tables(&self.ephemeral_tables); + // If the TX will be logged, record its projected tx offset, // then increment the counter. if self.tx_consumes_offset(&tx_data, ctx) { @@ -754,8 +759,6 @@ impl CommittedState { self.next_tx_offset += 1; } - tx_data.set_ephemeral_tables(&self.ephemeral_tables); - tx_data } diff --git a/crates/datastore/src/traits.rs b/crates/datastore/src/traits.rs index e1161b5a11b..644d0032fae 100644 --- a/crates/datastore/src/traits.rs +++ b/crates/datastore/src/traits.rs @@ -236,7 +236,7 @@ impl TxData { /// Determines which ephemeral tables were modified in this transaction. /// /// Iterates over the tables updated in this transaction and records those that - /// also appear in `all_ephemeral_tables`. + /// also appear in `all_ephemeral_tables`. /// `self.ephemeral_tables` remains `None` if no ephemeral tables were modified. pub fn set_ephemeral_tables(&mut self, all_ephemeral_tables: &EphemeralTables) { for tid in self.tables.keys() { @@ -252,6 +252,16 @@ impl TxData { self.ephemeral_tables.as_ref() } + /// Check if `table_id` is in the set of ephemeral tables for this transaction. + /// + /// Beware that ephemeral tables are known only after [Self::set_ephemeral_tables] + /// has been called. + pub fn is_ephemeral_table(&self, table_id: &TableId) -> bool { + self.ephemeral_tables + .as_ref() + .is_some_and(|etables| etables.contains(table_id)) + } + /// Obtain an iterator over the inserted rows per table. pub fn inserts(&self) -> impl Iterator)> + '_ { self.inserts.iter() @@ -304,16 +314,26 @@ impl TxData { self.truncates.iter().copied() } - /// Check if this [`TxData`] contains any `inserted | deleted` rows or `connect/disconnect` operations. + /// Check if this [`TxData`] contains any `inserted | deleted` rows + /// or `connect/disconnect` operations. + /// + /// Mutations of ephemeral tables are excluded, i.e. if the transaction + /// modifies only ephemeral tables (and is not a connect/disconnect operation), + /// the method returns `false`. /// /// This is used to determine if a transaction should be written to disk. pub fn has_rows_or_connect_disconnect(&self, reducer_context: Option<&ReducerContext>) -> bool { - self.inserts().any(|(_, inserted_rows)| !inserted_rows.is_empty()) - || self.deletes().any(|(.., deleted_rows)| !deleted_rows.is_empty()) - || matches!( - reducer_context.map(|rcx| rcx.name.strip_prefix("__identity_")), - Some(Some("connected__" | "disconnected__")) - ) + let is_non_ephemeral_mutation = + |(table_id, rows): (_, &Arc<[_]>)| !(self.is_ephemeral_table(table_id) || rows.is_empty()); + + let has_inserts = self.inserts().any(is_non_ephemeral_mutation); + let has_deletes = self.deletes().any(is_non_ephemeral_mutation); + let is_connect_disconnect = matches!( + reducer_context.map(|rcx| rcx.name.strip_prefix("__identity_")), + Some(Some("connected__" | "disconnected__")) + ); + + has_inserts || has_deletes || is_connect_disconnect } /// Returns a list of tables affected in this transaction. From e7fbc3f0b0adf2b6512ba74c39f3d27d9073afed Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Tue, 16 Dec 2025 09:58:49 +0100 Subject: [PATCH 2/5] Apply suggestions from code review Co-authored-by: Mazdak Farrokhzad Signed-off-by: Kim Altintop --- crates/core/src/db/relational_db.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index bf881ecefac..e110670c5c5 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -818,7 +818,7 @@ impl RelationalDB { Txdata, }; - let is_not_ephemeral_table = |table_id: &TableId| -> bool { !tx_data.is_ephemeral_table(table_id) }; + let is_persistent_table = |table_id: &TableId| -> bool { !tx_data.is_ephemeral_table(table_id) }; if tx_data.tx_offset().is_some() { let inserts: Box<_> = tx_data @@ -844,7 +844,7 @@ impl RelationalDB { .filter(|ops| !truncates.contains(&ops.table_id)) .collect(); - let truncates: Box<_> = truncates.into_iter().filter(is_not_ephemeral_table).collect(); + let truncates: Box<_> = truncates.into_iter().filter(is_persistent_table).collect(); let inputs = reducer_context.map(|rcx| rcx.into()); From 775d04ff81c3a5ed4a347989de0a76ad4ecd46c5 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Tue, 16 Dec 2025 09:59:05 +0100 Subject: [PATCH 3/5] Update crates/core/src/db/relational_db.rs Co-authored-by: Mazdak Farrokhzad Signed-off-by: Kim Altintop --- crates/core/src/db/relational_db.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index e110670c5c5..18bc7a6a61e 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -824,7 +824,7 @@ impl RelationalDB { let inserts: Box<_> = tx_data .inserts() // Skip ephemeral tables - .filter(|(table_id, _)| is_not_ephemeral_table(table_id)) + .filter(|(table_id, _)| is_persistent_table(table_id)) .map(|(table_id, rowdata)| Ops { table_id: *table_id, rowdata: rowdata.clone(), From 8b60696d6adf5fa8189bcde10f13591d771ee033 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Tue, 16 Dec 2025 09:59:13 +0100 Subject: [PATCH 4/5] Update crates/core/src/db/relational_db.rs Co-authored-by: Mazdak Farrokhzad Signed-off-by: Kim Altintop --- crates/core/src/db/relational_db.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 18bc7a6a61e..9baa64585a5 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -835,7 +835,7 @@ impl RelationalDB { let deletes: Box<_> = tx_data .deletes() - .filter(|(table_id, _)| is_not_ephemeral_table(table_id)) + .filter(|(table_id, _)| is_persistent_table(table_id)) .map(|(table_id, rowdata)| Ops { table_id: *table_id, rowdata: rowdata.clone(), From 83c858fa78ec527a202358dee559513a51b0c03f Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Tue, 16 Dec 2025 10:01:18 +0100 Subject: [PATCH 5/5] Ensure predicates for `has_rows_or_connect_disconnect` are evaluated lazily --- crates/datastore/src/traits.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/crates/datastore/src/traits.rs b/crates/datastore/src/traits.rs index 644d0032fae..defa6c67de3 100644 --- a/crates/datastore/src/traits.rs +++ b/crates/datastore/src/traits.rs @@ -326,14 +326,12 @@ impl TxData { let is_non_ephemeral_mutation = |(table_id, rows): (_, &Arc<[_]>)| !(self.is_ephemeral_table(table_id) || rows.is_empty()); - let has_inserts = self.inserts().any(is_non_ephemeral_mutation); - let has_deletes = self.deletes().any(is_non_ephemeral_mutation); - let is_connect_disconnect = matches!( - reducer_context.map(|rcx| rcx.name.strip_prefix("__identity_")), - Some(Some("connected__" | "disconnected__")) - ); - - has_inserts || has_deletes || is_connect_disconnect + self.inserts().any(is_non_ephemeral_mutation) + || self.deletes().any(is_non_ephemeral_mutation) + || matches!( + reducer_context.map(|rcx| rcx.name.strip_prefix("__identity_")), + Some(Some("connected__" | "disconnected__")) + ) } /// Returns a list of tables affected in this transaction.