Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

138 changes: 66 additions & 72 deletions crates/core/src/db/durability.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};
use std::{sync::Arc, time::Duration};

use log::{error, info, warn};
use futures::TryFutureExt as _;
use log::{error, info};
use spacetimedb_commitlog::payload::{
txdata::{Mutations, Ops},
Txdata,
Expand All @@ -18,27 +13,32 @@ use spacetimedb_lib::Identity;
use spacetimedb_primitives::TableId;
use tokio::{
runtime,
sync::mpsc::{channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender},
time::{timeout, Instant},
sync::{
futures::OwnedNotified,
mpsc::{channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender},
oneshot, Notify,
},
time::timeout,
};

use crate::db::{lock_file::LockFile, persistence::Durability};
use crate::db::persistence::Durability;

/// A request to persist a transaction or to terminate the actor.
pub struct DurabilityRequest {
reducer_context: Option<ReducerContext>,
tx_data: Arc<TxData>,
}

type ShutdownReply = oneshot::Sender<OwnedNotified>;

/// Represents a handle to a background task that persists transactions
/// according to the [`Durability`] policy provided.
///
/// This exists to avoid holding a transaction lock while
/// preparing the [TxData] for processing by the [Durability] layer.
pub struct DurabilityWorker {
request_tx: UnboundedSender<DurabilityRequest>,
requested_tx_offset: AtomicU64,
shutdown: Sender<()>,
shutdown: Sender<ShutdownReply>,
durability: Arc<Durability>,
runtime: runtime::Handle,
}
Expand All @@ -64,7 +64,6 @@ impl DurabilityWorker {

Self {
request_tx,
requested_tx_offset: AtomicU64::new(0),
shutdown: shutdown_tx,
durability,
runtime,
Expand Down Expand Up @@ -97,14 +96,6 @@ impl DurabilityWorker {
reducer_context,
tx_data: tx_data.clone(),
})
.inspect(|()| {
// If `tx_data` has a `None` tx offset, the actor will ignore it.
// Otherwise, record the offset as requested, so that
// [Self::shutdown] can determine when the queue is drained.
if let Some(tx_offset) = tx_data.tx_offset() {
self.requested_tx_offset.fetch_max(tx_offset, Ordering::SeqCst);
}
})
.expect(HUNG_UP);
}

Expand All @@ -121,22 +112,26 @@ impl DurabilityWorker {
///
/// # Panics
///
/// If [Self::request_durability] is called after [Self::shutdown], the
/// former will panic.
pub async fn shutdown(&self) -> anyhow::Result<TxOffset> {
// Request the actor to shutdown.
// Ignore send errors -- in that case a shutdown is already in progress.
let _ = self.shutdown.try_send(());
// Wait for the request channel to be closed.
self.request_tx.closed().await;
// Load the latest tx offset and wait for it to become durable.
let latest_tx_offset = self.requested_tx_offset.load(Ordering::SeqCst);
let durable_offset = self.durable_tx_offset().wait_for(latest_tx_offset).await?;

Ok(durable_offset)
/// After this method was called, calling [Self::request_durability]
/// will panic.
pub async fn close(&self) -> Option<TxOffset> {
let (done_tx, done_rx) = oneshot::channel();
// Channel errors can be ignored.
// It just means that the actor already exited.
let _ = self
.shutdown
.send(done_tx)
.map_err(drop)
.and_then(|()| done_rx.map_err(drop))
.and_then(|done| async move {
done.await;
Ok(())
})
.await;
self.durability.close().await
}

/// Consume `self` and run [Self::shutdown].
/// Consume `self` and run [Self::close].
///
/// The `lock_file` is not dropped until the shutdown is complete (either
/// successfully or unsuccessfully). This is to prevent the database to be
Expand All @@ -151,59 +146,44 @@ impl DurabilityWorker {
/// owning this durability worker.
///
/// This method is used in the `Drop` impl for [crate::db::relational_db::RelationalDB].
pub(super) fn spawn_shutdown(self, database_identity: Identity, lock_file: LockFile) {
pub(super) fn spawn_close(self, database_identity: Identity) {
let rt = self.runtime.clone();
let mut shutdown = rt.spawn(async move { self.shutdown().await });
rt.spawn(async move {
let label = format!("[{database_identity}]");
let start = Instant::now();
loop {
// Warn every 5s if the shutdown doesn't appear to make progress.
// The backing durability could still be writing to disk,
// but we can't cancel it from here,
// so dropping the lock file would be unsafe.
match timeout(Duration::from_secs(5), &mut shutdown).await {
Err(_elapsed) => {
let since = start.elapsed().as_secs_f32();
error!("{label} waiting for durability worker shutdown since {since}s",);
continue;
}
Ok(res) => {
let Ok(done) = res else {
warn!("{label} durability worker shutdown cancelled");
break;
};
match done {
Ok(offset) => info!("{label} durability worker shut down at tx offset: {offset}"),
Err(e) => warn!("{label} error shutting down durability worker: {e:#}"),
}
break;
}
// Apply a timeout, in case `Durability::close` doesn't terminate
// as advertised. This is a bug, but panicking here would not
// unwind at the call site.
match timeout(Duration::from_secs(10), self.close()).await {
Err(_elapsed) => {
error!("{label} timeout waiting for durability worker shutdown");
}
Ok(offset) => {
info!("{label} durability worker shut down at tx offset: {offset:?}");
}
}
drop(lock_file);
});
}
}

pub struct DurabilityWorkerActor {
request_rx: UnboundedReceiver<DurabilityRequest>,
shutdown: Receiver<()>,
shutdown: Receiver<ShutdownReply>,
durability: Arc<Durability>,
}

impl DurabilityWorkerActor {
/// Processes requests to do durability.
async fn run(mut self) {
let done = scopeguard::guard(Arc::new(Notify::new()), |done| done.notify_waiters());
loop {
tokio::select! {
// Biased towards the shutdown channel,
// so that adding new requests is prevented promptly.
biased;

Some(()) = self.shutdown.recv() => {
Some(reply) = self.shutdown.recv() => {
self.request_rx.close();
self.shutdown.close();
let _ = reply.send(done.clone().notified_owned());
},

req = self.request_rx.recv() => {
Expand All @@ -214,6 +194,8 @@ impl DurabilityWorkerActor {
}
}
}

info!("durability worker actor done");
}

pub fn do_durability(durability: &Durability, reducer_context: Option<ReducerContext>, tx_data: &TxData) {
Expand Down Expand Up @@ -280,6 +262,7 @@ impl DurabilityWorkerActor {
mod tests {
use std::{pin::pin, task::Poll};

use futures::FutureExt as _;
use pretty_assertions::assert_matches;
use spacetimedb_sats::product;
use tokio::sync::watch;
Expand Down Expand Up @@ -318,6 +301,22 @@ mod tests {
fn durable_tx_offset(&self) -> DurableOffset {
self.durable.subscribe().into()
}

fn close(&self) -> spacetimedb_durability::Close {
let mut durable = self.durable.subscribe();
let appended = self.appended.subscribe();
async move {
let durable_offset = durable
.wait_for(|durable| match (*durable).zip(*appended.borrow()) {
Some((durable_offset, appended_offset)) => durable_offset >= appended_offset,
None => false,
})
.await
.unwrap();
*durable_offset
}
.boxed()
}
}

#[tokio::test]
Expand All @@ -333,13 +332,8 @@ mod tests {

worker.request_durability(None, &Arc::new(txdata));
}
assert_eq!(
10,
worker.requested_tx_offset.load(Ordering::Relaxed),
"worker should have requested up to tx offset 10"
);

let shutdown = worker.shutdown();
let shutdown = worker.close();
let mut shutdown_fut = pin!(shutdown);
assert_matches!(
futures::poll!(&mut shutdown_fut),
Expand All @@ -357,7 +351,7 @@ mod tests {
durability.mark_durable(10).await;
assert_matches!(
futures::poll!(&mut shutdown_fut),
Poll::Ready(Ok(10)),
Poll::Ready(Some(10)),
"shutdown returns, reporting durable offset at 10"
);
assert_eq!(
Expand Down
34 changes: 0 additions & 34 deletions crates/core/src/db/lock_file.rs

This file was deleted.

2 changes: 0 additions & 2 deletions crates/core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use spacetimedb_datastore::execution_context::WorkloadType;
use spacetimedb_datastore::{locking_tx_datastore::datastore::TxMetrics, traits::TxData};

mod durability;
mod lock_file;
use lock_file::LockFile;
pub mod persistence;
pub mod relational_db;
pub mod snapshot;
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/db/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,14 @@ impl LocalPersistenceProvider {
impl PersistenceProvider for LocalPersistenceProvider {
async fn persistence(&self, database: &Database, replica_id: u64) -> anyhow::Result<Persistence> {
let replica_dir = self.data_dir.replica(replica_id);
let commitlog_dir = replica_dir.commit_log();
let snapshot_dir = replica_dir.snapshots();

let database_identity = database.database_identity;
let snapshot_worker =
asyncify(move || relational_db::open_snapshot_repo(snapshot_dir, database_identity, replica_id))
.await
.map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Enabled))?;
let (durability, disk_size) = relational_db::local_durability(commitlog_dir, Some(&snapshot_worker)).await?;
let (durability, disk_size) = relational_db::local_durability(replica_dir, Some(&snapshot_worker)).await?;

tokio::spawn(relational_db::snapshot_watching_commitlog_compressor(
snapshot_worker.subscribe(),
Expand Down
Loading
Loading