Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 39 additions & 9 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,18 +818,13 @@ impl RelationalDB {
Txdata,
};

let is_not_ephemeral_table = |table_id: &TableId| -> bool {
tx_data
.ephemeral_tables()
.map(|etables| !etables.contains(table_id))
.unwrap_or(true)
};
let is_persistent_table = |table_id: &TableId| -> bool { !tx_data.is_ephemeral_table(table_id) };

if tx_data.tx_offset().is_some() {
let inserts: Box<_> = tx_data
.inserts()
// Skip ephemeral tables
.filter(|(table_id, _)| is_not_ephemeral_table(table_id))
.filter(|(table_id, _)| is_persistent_table(table_id))
.map(|(table_id, rowdata)| Ops {
table_id: *table_id,
rowdata: rowdata.clone(),
Expand All @@ -840,7 +835,7 @@ impl RelationalDB {

let deletes: Box<_> = tx_data
.deletes()
.filter(|(table_id, _)| is_not_ephemeral_table(table_id))
.filter(|(table_id, _)| is_persistent_table(table_id))
.map(|(table_id, rowdata)| Ops {
table_id: *table_id,
rowdata: rowdata.clone(),
Expand All @@ -849,10 +844,15 @@ impl RelationalDB {
.filter(|ops| !truncates.contains(&ops.table_id))
.collect();

let truncates = truncates.into_iter().filter(is_not_ephemeral_table).collect();
let truncates: Box<_> = truncates.into_iter().filter(is_persistent_table).collect();

let inputs = reducer_context.map(|rcx| rcx.into());

debug_assert!(
!(inserts.is_empty() && truncates.is_empty() && deletes.is_empty() && inputs.is_none()),
"empty transaction"
);

let txdata = Txdata {
inputs,
outputs: None,
Expand Down Expand Up @@ -2632,6 +2632,36 @@ mod tests {
Ok(())
}

#[test]
fn test_view_materialization_does_not_consume_tx_offset() -> ResultTest<()> {
let stdb = TestDB::durable_without_snapshot_repo()?;

let (tx_offset_1, view_id, table_id) = {
let module_def = view_module_def();
let view_def = module_def.view("my_view").unwrap();

let mut tx = begin_mut_tx(&stdb);
let (view_id, table_id) = stdb.create_view(&mut tx, &module_def, view_def)?;
let (tx_offset, tx_data, ..) = stdb.commit_tx(tx)?.unwrap();
assert_eq!(Some(tx_offset), tx_data.tx_offset());

(tx_offset, view_id, table_id)
};

let mut tx = begin_mut_tx(&stdb);
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ONE)?;
stdb.materialize_view(&mut tx, table_id, Identity::ONE, vec![product![10u8]])?;
let (tx_offset_2, tx_data, ..) = stdb.commit_tx(tx)?.unwrap();

// `tx_data.tx_offset()` should return `None`,
// so that it is not considered for durability.
// The tx offset reported for confirmed reads should stay the same.
assert!(tx_data.tx_offset().is_none());
assert_eq!(tx_offset_1, tx_offset_2);

Ok(())
}

#[test]
fn test_view_tables_are_ephemeral_with_snapshot() -> ResultTest<()> {
let stdb = TestDB::durable()?;
Expand Down
7 changes: 5 additions & 2 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,15 +747,18 @@ impl CommittedState {
// so that we can pass updated set of table ids.
self.merge_read_sets(read_sets);

// Store in `tx_data` which of the updated tables are ephemeral.
// NOTE: This must be called before `tx_consumes_offset`, so that
// all-ephemeral transactions do not consume a tx offset.
tx_data.set_ephemeral_tables(&self.ephemeral_tables);

// If the TX will be logged, record its projected tx offset,
// then increment the counter.
if self.tx_consumes_offset(&tx_data, ctx) {
tx_data.set_tx_offset(self.next_tx_offset);
self.next_tx_offset += 1;
}

tx_data.set_ephemeral_tables(&self.ephemeral_tables);

tx_data
}

Expand Down
26 changes: 22 additions & 4 deletions crates/datastore/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ impl TxData {
/// Determines which ephemeral tables were modified in this transaction.
///
/// Iterates over the tables updated in this transaction and records those that
/// also appear in `all_ephemeral_tables`.
/// also appear in `all_ephemeral_tables`.
/// `self.ephemeral_tables` remains `None` if no ephemeral tables were modified.
pub fn set_ephemeral_tables(&mut self, all_ephemeral_tables: &EphemeralTables) {
for tid in self.tables.keys() {
Expand All @@ -252,6 +252,16 @@ impl TxData {
self.ephemeral_tables.as_ref()
}

/// Check if `table_id` is in the set of ephemeral tables for this transaction.
///
/// Beware that ephemeral tables are known only after [Self::set_ephemeral_tables]
/// has been called.
pub fn is_ephemeral_table(&self, table_id: &TableId) -> bool {
self.ephemeral_tables
.as_ref()
.is_some_and(|etables| etables.contains(table_id))
}

/// Obtain an iterator over the inserted rows per table.
pub fn inserts(&self) -> impl Iterator<Item = (&TableId, &Arc<[ProductValue]>)> + '_ {
self.inserts.iter()
Expand Down Expand Up @@ -304,12 +314,20 @@ impl TxData {
self.truncates.iter().copied()
}

/// Check if this [`TxData`] contains any `inserted | deleted` rows or `connect/disconnect` operations.
/// Check if this [`TxData`] contains any `inserted | deleted` rows
/// or `connect/disconnect` operations.
///
/// Mutations of ephemeral tables are excluded, i.e. if the transaction
/// modifies only ephemeral tables (and is not a connect/disconnect operation),
/// the method returns `false`.
///
/// This is used to determine if a transaction should be written to disk.
pub fn has_rows_or_connect_disconnect(&self, reducer_context: Option<&ReducerContext>) -> bool {
self.inserts().any(|(_, inserted_rows)| !inserted_rows.is_empty())
|| self.deletes().any(|(.., deleted_rows)| !deleted_rows.is_empty())
let is_non_ephemeral_mutation =
|(table_id, rows): (_, &Arc<[_]>)| !(self.is_ephemeral_table(table_id) || rows.is_empty());

self.inserts().any(is_non_ephemeral_mutation)
|| self.deletes().any(is_non_ephemeral_mutation)
|| matches!(
reducer_context.map(|rcx| rcx.name.strip_prefix("__identity_")),
Some(Some("connected__" | "disconnected__"))
Expand Down