diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 99f57806247..9baa64585a5 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -818,18 +818,13 @@ impl RelationalDB { Txdata, }; - let is_not_ephemeral_table = |table_id: &TableId| -> bool { - tx_data - .ephemeral_tables() - .map(|etables| !etables.contains(table_id)) - .unwrap_or(true) - }; + let is_persistent_table = |table_id: &TableId| -> bool { !tx_data.is_ephemeral_table(table_id) }; if tx_data.tx_offset().is_some() { let inserts: Box<_> = tx_data .inserts() // Skip ephemeral tables - .filter(|(table_id, _)| is_not_ephemeral_table(table_id)) + .filter(|(table_id, _)| is_persistent_table(table_id)) .map(|(table_id, rowdata)| Ops { table_id: *table_id, rowdata: rowdata.clone(), @@ -840,7 +835,7 @@ impl RelationalDB { let deletes: Box<_> = tx_data .deletes() - .filter(|(table_id, _)| is_not_ephemeral_table(table_id)) + .filter(|(table_id, _)| is_persistent_table(table_id)) .map(|(table_id, rowdata)| Ops { table_id: *table_id, rowdata: rowdata.clone(), @@ -849,10 +844,15 @@ impl RelationalDB { .filter(|ops| !truncates.contains(&ops.table_id)) .collect(); - let truncates = truncates.into_iter().filter(is_not_ephemeral_table).collect(); + let truncates: Box<_> = truncates.into_iter().filter(is_persistent_table).collect(); let inputs = reducer_context.map(|rcx| rcx.into()); + debug_assert!( + !(inserts.is_empty() && truncates.is_empty() && deletes.is_empty() && inputs.is_none()), + "empty transaction" + ); + let txdata = Txdata { inputs, outputs: None, @@ -2632,6 +2632,36 @@ mod tests { Ok(()) } + #[test] + fn test_view_materialization_does_not_consume_tx_offset() -> ResultTest<()> { + let stdb = TestDB::durable_without_snapshot_repo()?; + + let (tx_offset_1, view_id, table_id) = { + let module_def = view_module_def(); + let view_def = module_def.view("my_view").unwrap(); + + let mut tx = begin_mut_tx(&stdb); + let (view_id, table_id) = stdb.create_view(&mut tx, &module_def, view_def)?; + let (tx_offset, tx_data, ..) = stdb.commit_tx(tx)?.unwrap(); + assert_eq!(Some(tx_offset), tx_data.tx_offset()); + + (tx_offset, view_id, table_id) + }; + + let mut tx = begin_mut_tx(&stdb); + tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ONE)?; + stdb.materialize_view(&mut tx, table_id, Identity::ONE, vec![product![10u8]])?; + let (tx_offset_2, tx_data, ..) = stdb.commit_tx(tx)?.unwrap(); + + // `tx_data.tx_offset()` should return `None`, + // so that it is not considered for durability. + // The tx offset reported for confirmed reads should stay the same. + assert!(tx_data.tx_offset().is_none()); + assert_eq!(tx_offset_1, tx_offset_2); + + Ok(()) + } + #[test] fn test_view_tables_are_ephemeral_with_snapshot() -> ResultTest<()> { let stdb = TestDB::durable()?; diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 41e5615c18e..c5a3a9d1a8b 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -747,6 +747,11 @@ impl CommittedState { // so that we can pass updated set of table ids. self.merge_read_sets(read_sets); + // Store in `tx_data` which of the updated tables are ephemeral. + // NOTE: This must be called before `tx_consumes_offset`, so that + // all-ephemeral transactions do not consume a tx offset. + tx_data.set_ephemeral_tables(&self.ephemeral_tables); + // If the TX will be logged, record its projected tx offset, // then increment the counter. if self.tx_consumes_offset(&tx_data, ctx) { @@ -754,8 +759,6 @@ impl CommittedState { self.next_tx_offset += 1; } - tx_data.set_ephemeral_tables(&self.ephemeral_tables); - tx_data } diff --git a/crates/datastore/src/traits.rs b/crates/datastore/src/traits.rs index e1161b5a11b..defa6c67de3 100644 --- a/crates/datastore/src/traits.rs +++ b/crates/datastore/src/traits.rs @@ -236,7 +236,7 @@ impl TxData { /// Determines which ephemeral tables were modified in this transaction. /// /// Iterates over the tables updated in this transaction and records those that - /// also appear in `all_ephemeral_tables`. + /// also appear in `all_ephemeral_tables`. /// `self.ephemeral_tables` remains `None` if no ephemeral tables were modified. pub fn set_ephemeral_tables(&mut self, all_ephemeral_tables: &EphemeralTables) { for tid in self.tables.keys() { @@ -252,6 +252,16 @@ impl TxData { self.ephemeral_tables.as_ref() } + /// Check if `table_id` is in the set of ephemeral tables for this transaction. + /// + /// Beware that ephemeral tables are known only after [Self::set_ephemeral_tables] + /// has been called. + pub fn is_ephemeral_table(&self, table_id: &TableId) -> bool { + self.ephemeral_tables + .as_ref() + .is_some_and(|etables| etables.contains(table_id)) + } + /// Obtain an iterator over the inserted rows per table. pub fn inserts(&self) -> impl Iterator)> + '_ { self.inserts.iter() @@ -304,12 +314,20 @@ impl TxData { self.truncates.iter().copied() } - /// Check if this [`TxData`] contains any `inserted | deleted` rows or `connect/disconnect` operations. + /// Check if this [`TxData`] contains any `inserted | deleted` rows + /// or `connect/disconnect` operations. + /// + /// Mutations of ephemeral tables are excluded, i.e. if the transaction + /// modifies only ephemeral tables (and is not a connect/disconnect operation), + /// the method returns `false`. /// /// This is used to determine if a transaction should be written to disk. pub fn has_rows_or_connect_disconnect(&self, reducer_context: Option<&ReducerContext>) -> bool { - self.inserts().any(|(_, inserted_rows)| !inserted_rows.is_empty()) - || self.deletes().any(|(.., deleted_rows)| !deleted_rows.is_empty()) + let is_non_ephemeral_mutation = + |(table_id, rows): (_, &Arc<[_]>)| !(self.is_ephemeral_table(table_id) || rows.is_empty()); + + self.inserts().any(is_non_ephemeral_mutation) + || self.deletes().any(is_non_ephemeral_mutation) || matches!( reducer_context.map(|rcx| rcx.name.strip_prefix("__identity_")), Some(Some("connected__" | "disconnected__"))