From 8d0b43062d27bad5881215f2efe9329ef80a7a29 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Tue, 16 Dec 2025 13:14:31 +0100 Subject: [PATCH] Create `db.lock` file only for persistent databases This is the first step to make in-memory only databases not touch the disk at all. Pending is an in-memory only sink for module logs. Responsibility for the lock file is transferred to `Durability`, which means that only persistent databases opened for writing acquire the lock. As a consequence, the `Durability` trait gains a `close` method that prevents further writes and drains the internal buffers, even when multiple `Arc`-pointers to the `Durability` exist. --- Cargo.lock | 3 + crates/core/src/db/durability.rs | 138 ++++---- crates/core/src/db/lock_file.rs | 34 -- crates/core/src/db/mod.rs | 2 - crates/core/src/db/persistence.rs | 3 +- crates/core/src/db/relational_db.rs | 194 ++++++----- crates/core/src/error.rs | 20 +- crates/core/src/host/host_controller.rs | 15 +- .../subscription/module_subscription_actor.rs | 23 +- crates/durability/Cargo.toml | 2 + crates/durability/src/imp/local.rs | 326 ++++++++++-------- crates/durability/src/imp/mod.rs | 23 +- crates/durability/src/lib.rs | 26 ++ crates/fs-utils/Cargo.toml | 3 +- crates/fs-utils/src/lockfile.rs | 82 +++++ crates/snapshot/tests/remote.rs | 9 +- 16 files changed, 522 insertions(+), 381 deletions(-) delete mode 100644 crates/core/src/db/lock_file.rs diff --git a/Cargo.lock b/Cargo.lock index 05af473dd53..9b2314b673d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7754,10 +7754,12 @@ name = "spacetimedb-durability" version = "1.11.0" dependencies = [ "anyhow", + "futures", "itertools 0.12.1", "log", "scopeguard", "spacetimedb-commitlog", + "spacetimedb-fs-utils", "spacetimedb-paths", "spacetimedb-sats 1.11.0", "tempfile", @@ -7805,6 +7807,7 @@ name = "spacetimedb-fs-utils" version = "1.11.0" dependencies = [ "anyhow", + "fs2", "hex", "rand 0.9.2", "tempdir", diff --git a/crates/core/src/db/durability.rs b/crates/core/src/db/durability.rs index 760e27d6526..0ea88bdb4b9 100644 --- a/crates/core/src/db/durability.rs +++ b/crates/core/src/db/durability.rs @@ -1,12 +1,7 @@ -use std::{ - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, - time::Duration, -}; +use std::{sync::Arc, time::Duration}; -use log::{error, info, warn}; +use futures::TryFutureExt as _; +use log::{error, info}; use spacetimedb_commitlog::payload::{ txdata::{Mutations, Ops}, Txdata, @@ -18,11 +13,15 @@ use spacetimedb_lib::Identity; use spacetimedb_primitives::TableId; use tokio::{ runtime, - sync::mpsc::{channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender}, - time::{timeout, Instant}, + sync::{ + futures::OwnedNotified, + mpsc::{channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender}, + oneshot, Notify, + }, + time::timeout, }; -use crate::db::{lock_file::LockFile, persistence::Durability}; +use crate::db::persistence::Durability; /// A request to persist a transaction or to terminate the actor. pub struct DurabilityRequest { @@ -30,6 +29,8 @@ pub struct DurabilityRequest { tx_data: Arc, } +type ShutdownReply = oneshot::Sender; + /// Represents a handle to a background task that persists transactions /// according to the [`Durability`] policy provided. /// @@ -37,8 +38,7 @@ pub struct DurabilityRequest { /// preparing the [TxData] for processing by the [Durability] layer. pub struct DurabilityWorker { request_tx: UnboundedSender, - requested_tx_offset: AtomicU64, - shutdown: Sender<()>, + shutdown: Sender, durability: Arc, runtime: runtime::Handle, } @@ -64,7 +64,6 @@ impl DurabilityWorker { Self { request_tx, - requested_tx_offset: AtomicU64::new(0), shutdown: shutdown_tx, durability, runtime, @@ -97,14 +96,6 @@ impl DurabilityWorker { reducer_context, tx_data: tx_data.clone(), }) - .inspect(|()| { - // If `tx_data` has a `None` tx offset, the actor will ignore it. - // Otherwise, record the offset as requested, so that - // [Self::shutdown] can determine when the queue is drained. - if let Some(tx_offset) = tx_data.tx_offset() { - self.requested_tx_offset.fetch_max(tx_offset, Ordering::SeqCst); - } - }) .expect(HUNG_UP); } @@ -121,22 +112,26 @@ impl DurabilityWorker { /// /// # Panics /// - /// If [Self::request_durability] is called after [Self::shutdown], the - /// former will panic. - pub async fn shutdown(&self) -> anyhow::Result { - // Request the actor to shutdown. - // Ignore send errors -- in that case a shutdown is already in progress. - let _ = self.shutdown.try_send(()); - // Wait for the request channel to be closed. - self.request_tx.closed().await; - // Load the latest tx offset and wait for it to become durable. - let latest_tx_offset = self.requested_tx_offset.load(Ordering::SeqCst); - let durable_offset = self.durable_tx_offset().wait_for(latest_tx_offset).await?; - - Ok(durable_offset) + /// After this method was called, calling [Self::request_durability] + /// will panic. + pub async fn close(&self) -> Option { + let (done_tx, done_rx) = oneshot::channel(); + // Channel errors can be ignored. + // It just means that the actor already exited. + let _ = self + .shutdown + .send(done_tx) + .map_err(drop) + .and_then(|()| done_rx.map_err(drop)) + .and_then(|done| async move { + done.await; + Ok(()) + }) + .await; + self.durability.close().await } - /// Consume `self` and run [Self::shutdown]. + /// Consume `self` and run [Self::close]. /// /// The `lock_file` is not dropped until the shutdown is complete (either /// successfully or unsuccessfully). This is to prevent the database to be @@ -151,59 +146,44 @@ impl DurabilityWorker { /// owning this durability worker. /// /// This method is used in the `Drop` impl for [crate::db::relational_db::RelationalDB]. - pub(super) fn spawn_shutdown(self, database_identity: Identity, lock_file: LockFile) { + pub(super) fn spawn_close(self, database_identity: Identity) { let rt = self.runtime.clone(); - let mut shutdown = rt.spawn(async move { self.shutdown().await }); rt.spawn(async move { let label = format!("[{database_identity}]"); - let start = Instant::now(); - loop { - // Warn every 5s if the shutdown doesn't appear to make progress. - // The backing durability could still be writing to disk, - // but we can't cancel it from here, - // so dropping the lock file would be unsafe. - match timeout(Duration::from_secs(5), &mut shutdown).await { - Err(_elapsed) => { - let since = start.elapsed().as_secs_f32(); - error!("{label} waiting for durability worker shutdown since {since}s",); - continue; - } - Ok(res) => { - let Ok(done) = res else { - warn!("{label} durability worker shutdown cancelled"); - break; - }; - match done { - Ok(offset) => info!("{label} durability worker shut down at tx offset: {offset}"), - Err(e) => warn!("{label} error shutting down durability worker: {e:#}"), - } - break; - } + // Apply a timeout, in case `Durability::close` doesn't terminate + // as advertised. This is a bug, but panicking here would not + // unwind at the call site. + match timeout(Duration::from_secs(10), self.close()).await { + Err(_elapsed) => { + error!("{label} timeout waiting for durability worker shutdown"); + } + Ok(offset) => { + info!("{label} durability worker shut down at tx offset: {offset:?}"); } } - drop(lock_file); }); } } pub struct DurabilityWorkerActor { request_rx: UnboundedReceiver, - shutdown: Receiver<()>, + shutdown: Receiver, durability: Arc, } impl DurabilityWorkerActor { /// Processes requests to do durability. async fn run(mut self) { + let done = scopeguard::guard(Arc::new(Notify::new()), |done| done.notify_waiters()); loop { tokio::select! { // Biased towards the shutdown channel, // so that adding new requests is prevented promptly. biased; - Some(()) = self.shutdown.recv() => { + Some(reply) = self.shutdown.recv() => { self.request_rx.close(); - self.shutdown.close(); + let _ = reply.send(done.clone().notified_owned()); }, req = self.request_rx.recv() => { @@ -214,6 +194,8 @@ impl DurabilityWorkerActor { } } } + + info!("durability worker actor done"); } pub fn do_durability(durability: &Durability, reducer_context: Option, tx_data: &TxData) { @@ -280,6 +262,7 @@ impl DurabilityWorkerActor { mod tests { use std::{pin::pin, task::Poll}; + use futures::FutureExt as _; use pretty_assertions::assert_matches; use spacetimedb_sats::product; use tokio::sync::watch; @@ -318,6 +301,22 @@ mod tests { fn durable_tx_offset(&self) -> DurableOffset { self.durable.subscribe().into() } + + fn close(&self) -> spacetimedb_durability::Close { + let mut durable = self.durable.subscribe(); + let appended = self.appended.subscribe(); + async move { + let durable_offset = durable + .wait_for(|durable| match (*durable).zip(*appended.borrow()) { + Some((durable_offset, appended_offset)) => durable_offset >= appended_offset, + None => false, + }) + .await + .unwrap(); + *durable_offset + } + .boxed() + } } #[tokio::test] @@ -333,13 +332,8 @@ mod tests { worker.request_durability(None, &Arc::new(txdata)); } - assert_eq!( - 10, - worker.requested_tx_offset.load(Ordering::Relaxed), - "worker should have requested up to tx offset 10" - ); - let shutdown = worker.shutdown(); + let shutdown = worker.close(); let mut shutdown_fut = pin!(shutdown); assert_matches!( futures::poll!(&mut shutdown_fut), @@ -357,7 +351,7 @@ mod tests { durability.mark_durable(10).await; assert_matches!( futures::poll!(&mut shutdown_fut), - Poll::Ready(Ok(10)), + Poll::Ready(Some(10)), "shutdown returns, reporting durable offset at 10" ); assert_eq!( diff --git a/crates/core/src/db/lock_file.rs b/crates/core/src/db/lock_file.rs deleted file mode 100644 index c18642c6582..00000000000 --- a/crates/core/src/db/lock_file.rs +++ /dev/null @@ -1,34 +0,0 @@ -use std::{fmt, fs::File, path::Path, sync::Arc}; - -use fs2::FileExt as _; -use spacetimedb_paths::server::ReplicaDir; - -use crate::error::{DBError, DatabaseError}; - -#[derive(Clone)] -pub struct LockFile { - path: Arc, - #[allow(unused)] - lock: Arc, -} - -impl LockFile { - pub fn lock(root: &ReplicaDir) -> Result { - root.create()?; - let path = root.0.join("db.lock"); - let lock = File::create(&path)?; - lock.try_lock_exclusive() - .map_err(|e| DatabaseError::DatabasedOpened(root.0.clone(), e.into()))?; - - Ok(Self { - path: path.into(), - lock: lock.into(), - }) - } -} - -impl fmt::Debug for LockFile { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("LockFile").field("path", &self.path).finish() - } -} diff --git a/crates/core/src/db/mod.rs b/crates/core/src/db/mod.rs index 22cc0a4deda..993588004e7 100644 --- a/crates/core/src/db/mod.rs +++ b/crates/core/src/db/mod.rs @@ -8,8 +8,6 @@ use spacetimedb_datastore::execution_context::WorkloadType; use spacetimedb_datastore::{locking_tx_datastore::datastore::TxMetrics, traits::TxData}; mod durability; -mod lock_file; -use lock_file::LockFile; pub mod persistence; pub mod relational_db; pub mod snapshot; diff --git a/crates/core/src/db/persistence.rs b/crates/core/src/db/persistence.rs index 7be1d9016fa..e837506da38 100644 --- a/crates/core/src/db/persistence.rs +++ b/crates/core/src/db/persistence.rs @@ -142,7 +142,6 @@ impl LocalPersistenceProvider { impl PersistenceProvider for LocalPersistenceProvider { async fn persistence(&self, database: &Database, replica_id: u64) -> anyhow::Result { let replica_dir = self.data_dir.replica(replica_id); - let commitlog_dir = replica_dir.commit_log(); let snapshot_dir = replica_dir.snapshots(); let database_identity = database.database_identity; @@ -150,7 +149,7 @@ impl PersistenceProvider for LocalPersistenceProvider { asyncify(move || relational_db::open_snapshot_repo(snapshot_dir, database_identity, replica_id)) .await .map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Enabled))?; - let (durability, disk_size) = relational_db::local_durability(commitlog_dir, Some(&snapshot_worker)).await?; + let (durability, disk_size) = relational_db::local_durability(replica_dir, Some(&snapshot_worker)).await?; tokio::spawn(relational_db::snapshot_watching_commitlog_compressor( snapshot_worker.subscribe(), diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 93bf29b8091..ce9fdacaa33 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1,5 +1,4 @@ use crate::db::durability::DurabilityWorker; -use crate::db::LockFile; use crate::db::MetricsRecorderQueue; use crate::error::{DBError, RestoreSnapshotError}; use crate::messages::control_db::HostType; @@ -10,7 +9,7 @@ use anyhow::{anyhow, Context}; use enum_map::EnumMap; use log::info; use spacetimedb_commitlog::repo::OnNewSegmentFn; -use spacetimedb_commitlog::{self as commitlog, SizeOnDisk}; +use spacetimedb_commitlog::{self as commitlog, Commitlog, SizeOnDisk}; use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::error::{DatastoreError, TableError, ViewError}; use spacetimedb_datastore::execution_context::{Workload, WorkloadType}; @@ -34,14 +33,14 @@ use spacetimedb_datastore::{ }, traits::TxData, }; -use spacetimedb_durability as durability; +use spacetimedb_durability::{self as durability, History}; use spacetimedb_lib::bsatn::ToBsatn; use spacetimedb_lib::db::auth::StAccess; use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, RawSql}; use spacetimedb_lib::st_var::StVarValue; use spacetimedb_lib::ConnectionId; use spacetimedb_lib::Identity; -use spacetimedb_paths::server::{CommitLogDir, ReplicaDir, SnapshotsPath}; +use spacetimedb_paths::server::{ReplicaDir, SnapshotsPath}; use spacetimedb_primitives::*; use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type; use spacetimedb_sats::memory_usage::MemoryUsage; @@ -112,10 +111,6 @@ pub struct RelationalDB { /// An async queue for recording transaction metrics off the main thread metrics_recorder_queue: Option, - - // TODO: Not needed when `durability` is `None` -- move to either - // [Persistence] or [Durability]. - lock_file: LockFile, } /// Perform a snapshot every `SNAPSHOT_FREQUENCY` transactions. @@ -137,24 +132,15 @@ impl std::fmt::Debug for RelationalDB { impl Drop for RelationalDB { fn drop(&mut self) { - // Attempt to flush the outstanding transactions, - // while preventing the lock file from being dropped. - // - // Unless the tokio runtime is already shutting down, - // it will run the spawned task to completion. - // Holding the lock file will prevent re-opening the database - // for writing while transactions are still being written to - // the durability backend. + // Attempt to flush the outstanding transactions. if let Some(worker) = self.durability.take() { - let lock_file = self.lock_file.clone(); - worker.spawn_shutdown(self.database_identity, lock_file); + worker.spawn_close(self.database_identity); } } } impl RelationalDB { fn new( - lock: LockFile, database_identity: Identity, owner_identity: Identity, inner: Locking, @@ -182,8 +168,6 @@ impl RelationalDB { workload_type_to_exec_counters, metrics_recorder_queue, - - lock_file: lock, } } @@ -272,7 +256,6 @@ impl RelationalDB { /// [ModuleHost]: crate::host::module_host::ModuleHost #[allow(clippy::too_many_arguments)] pub fn open( - root: &ReplicaDir, database_identity: Identity, owner_identity: Identity, history: impl durability::History, @@ -282,8 +265,6 @@ impl RelationalDB { ) -> Result<(Self, ConnectedClients), DBError> { log::trace!("[{database_identity}] DATABASE: OPEN"); - let lock = LockFile::lock(root)?; - // Check the latest durable TX and restore from a snapshot no newer than it, // so that you drop TXes which were committed but not durable before the restart. // TODO: delete or mark as invalid snapshots newer than this. @@ -326,7 +307,6 @@ impl RelationalDB { .set(elapsed_time.as_secs_f64()); let db = Self::new( - lock, database_identity, owner_identity, inner, @@ -361,21 +341,22 @@ impl RelationalDB { /// Shut down the database, without dropping it. /// /// If the database is in-memory only, this does nothing. - /// Otherwise, it instructs the durability layer to shut down and waits - /// until all outstanding transactions are reported as durable. + /// Otherwise, it instructs the durability layer to shut down + /// and waits until all outstanding transactions are reported as durable. /// - /// Returns `Ok(None)` if the database is in-memory only. - /// Returns the durable [TxOffset] in a `Some` otherwise. + /// After calling this method, calling [Self::commit_tx_downgrade] or + /// [Self::commit_tx] will panic. + /// + /// Returns `None` if the database is in-memory only, + /// or nothing has been durably persisted yet. /// - /// An error is returned if the durability layer failed to make the - /// outstanding transactions durable. - pub async fn shutdown(&self) -> Result, DBError> { + /// Returns the durable [TxOffset] in a `Some` otherwise. + pub async fn shutdown(&self) -> Option { if let Some(durability) = &self.durability { - let durable_offset = durability.shutdown().await?; - return Ok(Some(durable_offset)); + return durability.close().await; } - Ok(None) + None } fn migrate_system_tables(&self) -> Result<(), DBError> { @@ -1731,9 +1712,9 @@ pub type LocalDurability = Arc>; /// Note that this operation can be expensive, as it needs to traverse a suffix /// of the commitlog. pub async fn local_durability( - commitlog_dir: CommitLogDir, + replica_dir: ReplicaDir, snapshot_worker: Option<&SnapshotWorker>, -) -> io::Result<(LocalDurability, DiskSizeFn)> { +) -> Result<(LocalDurability, DiskSizeFn), DBError> { let rt = tokio::runtime::Handle::current(); // TODO: Should this better be spawn_blocking? let on_new_segment = snapshot_worker.map(|snapshot_worker| { @@ -1746,7 +1727,7 @@ pub async fn local_durability( }); let local = spawn_rayon(move || { durability::Local::open( - commitlog_dir, + replica_dir.clone(), rt, durability::local::Options { commitlog: commitlog::Options { @@ -1770,6 +1751,14 @@ pub async fn local_durability( Ok((local, disk_size_fn)) } +/// Open a [History] for replay from the local durable state. +/// +/// Currently, this is simply a read-only copy of the commitlog. +pub async fn local_history(replica_dir: &ReplicaDir) -> io::Result> { + let commitlog_dir = replica_dir.commit_log(); + asyncify(move || Commitlog::open(commitlog_dir, <_>::default(), None)).await +} + /// Watches snapshot creation events and compresses all commitlog segments older /// than the snapshot. /// @@ -1863,6 +1852,7 @@ pub mod tests_utils { use spacetimedb_datastore::locking_tx_datastore::TxId; use spacetimedb_fs_utils::compression::CompressType; use spacetimedb_lib::{bsatn::to_vec, ser::Serialize}; + use spacetimedb_paths::server::ReplicaDir; use spacetimedb_paths::server::SnapshotDirPath; use spacetimedb_paths::FromPathUnchecked; use tempfile::TempDir; @@ -1894,7 +1884,7 @@ pub mod tests_utils { Arc, Option>>, Option, - TestDBDir, + Option, ); /// A [`RelationalDB`] in a temporary directory. @@ -1913,11 +1903,7 @@ pub mod tests_utils { pub struct TestDB { pub db: Arc, - // nb: drop order is declaration order durable: Option, - dir: TestDBDir, - - /// Whether to construct a snapshot repository when restarting with [`Self::reopen`]. want_snapshot_repo: bool, } @@ -1941,8 +1927,9 @@ pub mod tests_utils { } struct DurableState { - handle: Arc>, + durability: Arc>, rt: tokio::runtime::Runtime, + replica_dir: TestDBDir, } impl TestDB { @@ -1952,13 +1939,10 @@ pub mod tests_utils { /// Create a [`TestDB`] which does not store data on disk. pub fn in_memory() -> Result { - let dir = TempReplicaDir::new()?; - let db = Self::in_memory_internal(&dir).map(Arc::new)?; + let db = Self::in_memory_internal().map(Arc::new)?; Ok(Self { db, - durable: None, - dir: dir.into(), want_snapshot_repo: false, }) } @@ -1973,13 +1957,16 @@ pub mod tests_utils { let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build()?; // Enter the runtime so that `Self::durable_internal` can spawn a `SnapshotWorker`. let _rt = rt.enter(); - let (db, handle) = Self::durable_internal(&dir, rt.handle().clone(), true)?; - let durable = DurableState { handle, rt }; + let (db, durability) = Self::durable_internal(&dir, rt.handle().clone(), true)?; + let durable = DurableState { + durability, + rt, + replica_dir: dir.into(), + }; Ok(Self { db: Arc::new(db), durable: Some(durable), - dir: dir.into(), want_snapshot_repo: true, }) } @@ -1989,13 +1976,16 @@ pub mod tests_utils { let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build()?; // Enter the runtime so that `Self::durable_internal` can spawn a `SnapshotWorker`. let _rt = rt.enter(); - let (db, handle) = Self::durable_internal(&dir, rt.handle().clone(), false)?; - let durable = DurableState { handle, rt }; + let (db, durability) = Self::durable_internal(&dir, rt.handle().clone(), false)?; + let durable = DurableState { + durability, + rt, + replica_dir: dir.into(), + }; Ok(Self { db: Arc::new(db), durable: Some(durable), - dir: dir.into(), want_snapshot_repo: false, }) } @@ -2015,8 +2005,8 @@ pub mod tests_utils { }) .transpose()?; - let (local, disk_size_fn) = rt.block_on(local_durability(root.commit_log(), snapshots.as_ref()))?; - let history = local.clone(); + let (local, disk_size_fn) = rt.block_on(local_durability(root.clone(), snapshots.as_ref()))?; + let history = local.as_history(); let persistence = Persistence { durability: local.clone(), @@ -2026,7 +2016,6 @@ pub mod tests_utils { }; let (db, _) = RelationalDB::open( - root, db_identity, owner_identity, history, @@ -2050,12 +2039,10 @@ pub mod tests_utils { history: impl durability::History, expected_num_clients: usize, ) -> Result { - let dir = TempReplicaDir::new()?; - let db = Self::open_db(&dir, history, None, None, expected_num_clients)?; + let db = Self::open_db(history, None, None, expected_num_clients)?; Ok(Self { db: Arc::new(db), durable: None, - dir: dir.into(), want_snapshot_repo: false, }) } @@ -2064,15 +2051,24 @@ pub mod tests_utils { /// to disk (if the database was created via [`Self::durable`]). pub fn reopen(self) -> Result { if let Some(rt) = self.runtime() { - rt.block_on(self.db.shutdown())?; + rt.block_on(self.db.shutdown()); } drop(self.db); - if let Some(DurableState { handle: _, rt }) = self.durable { + if let Some(DurableState { + durability: _, + rt, + replica_dir, + }) = self.durable + { // Enter the runtime so that `Self::durable_internal` can spawn a `SnapshotWorker`. let _rt = rt.enter(); - let (db, handle) = Self::durable_internal(&self.dir, rt.handle().clone(), self.want_snapshot_repo)?; - let durable = DurableState { handle, rt }; + let (db, handle) = Self::durable_internal(&replica_dir, rt.handle().clone(), self.want_snapshot_repo)?; + let durable = DurableState { + durability: handle, + rt, + replica_dir, + }; Ok(Self { db: Arc::new(db), @@ -2080,7 +2076,7 @@ pub mod tests_utils { ..self }) } else { - let db = Self::in_memory_internal(&self.dir).map(Arc::new)?; + let db = Self::in_memory_internal().map(Arc::new)?; Ok(Self { db, ..self }) } } @@ -2092,8 +2088,8 @@ pub mod tests_utils { } /// The root path of the (temporary) database directory. - pub fn path(&self) -> &ReplicaDir { - &self.dir + pub fn path(&self) -> Option<&TestDBDir> { + self.durable.as_ref().map(|dstate| &dstate.replica_dir) } /// Handle to the tokio runtime, available if [`Self::durable`] was used @@ -2105,15 +2101,19 @@ pub mod tests_utils { /// Deconstruct `self` into its constituents. #[allow(unused)] pub fn into_parts(self) -> TestDBParts { - let Self { db, durable, dir, .. } = self; - let (durability, rt) = durable - .map(|DurableState { handle, rt }| (Some(handle), Some(rt))) - .unwrap_or((None, None)); - (db, durability, rt, dir) + let Self { db, durable, .. } = self; + match durable { + Some(DurableState { + durability: handle, + rt, + replica_dir, + }) => (db, Some(handle), Some(rt), Some(replica_dir)), + None => (db, None, None, None), + } } - fn in_memory_internal(root: &ReplicaDir) -> Result { - Self::open_db(root, EmptyHistory::new(), None, None, 0) + fn in_memory_internal() -> Result { + Self::open_db(EmptyHistory::new(), None, None, 0) } fn durable_internal( @@ -2127,28 +2127,26 @@ pub mod tests_utils { .map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Disabled)) }) .transpose()?; - let (local, disk_size_fn) = rt.block_on(local_durability(root.commit_log(), snapshots.as_ref()))?; - let history = local.clone(); + let (local, disk_size_fn) = rt.block_on(local_durability(root.clone(), snapshots.as_ref()))?; + let history = local.as_history(); let persistence = Persistence { durability: local.clone(), disk_size: disk_size_fn, snapshots, runtime: rt, }; - let db = Self::open_db(root, history, Some(persistence), None, 0)?; + let db = Self::open_db(history, Some(persistence), None, 0)?; Ok((db, local)) } pub fn open_db( - root: &ReplicaDir, history: impl durability::History, persistence: Option, metrics_recorder_queue: Option, expected_num_clients: usize, ) -> Result { let (db, connected_clients) = RelationalDB::open( - root, Self::DATABASE_IDENTITY, Self::OWNER, history, @@ -2344,7 +2342,6 @@ mod tests { use bytes::Bytes; use commitlog::payload::txdata; use commitlog::Commitlog; - use durability::EmptyHistory; use pretty_assertions::{assert_eq, assert_matches}; use spacetimedb_data_structures::map::IntMap; use spacetimedb_datastore::error::{DatastoreError, IndexError}; @@ -2358,6 +2355,7 @@ mod tests { use spacetimedb_lib::error::ResultTest; use spacetimedb_lib::Identity; use spacetimedb_lib::Timestamp; + use spacetimedb_paths::server::ReplicaDir; use spacetimedb_paths::FromPathUnchecked; use spacetimedb_sats::buffer::BufReader; use spacetimedb_sats::product; @@ -2469,14 +2467,13 @@ mod tests { stdb.create_table(&mut tx, my_table(AlgebraicType::I32))?; stdb.commit_tx(tx)?; - match RelationalDB::open( - stdb.path(), - Identity::ZERO, - Identity::ZERO, - EmptyHistory::new(), - None, - None, - PagePool::new_for_test(), + match TestDB::open_existing_durable( + stdb.path().unwrap(), + stdb.runtime().unwrap().clone(), + 0, + stdb.database_identity(), + stdb.owner_identity(), + false, ) { Ok(_) => { panic!("Allowed to open database twice") @@ -2484,7 +2481,7 @@ mod tests { Err(e) => match e { DBError::Database(DatabaseError::DatabasedOpened(_, _)) => {} err => { - panic!("Failed with error {err}") + panic!("Failed with error {err:?}") } }, } @@ -2606,7 +2603,7 @@ mod tests { "View table should NOT be empty after insert" ); - let root = stdb.path().snapshots(); + let root = stdb.path().unwrap().snapshots(); let (_, repo) = make_snapshot(root.clone(), Identity::ZERO, 0, CompressType::None, false); stdb.take_snapshot(&repo)?.expect("snapshot should succeed"); @@ -3424,9 +3421,9 @@ mod tests { // Ensure all resources are released and data is flushed to disk. let (db, _, rt, dir) = stdb.into_parts(); let rt = rt.expect("Durable TestDB must have a runtime"); - rt.block_on(db.shutdown()).expect("failed to shut down database"); + rt.block_on(db.shutdown()).expect("should have durable offset"); drop(db); - dir + dir.expect("Durable TestDB must have a database directory") }; // Re-open commitlog and collect inputs. @@ -3569,7 +3566,7 @@ mod tests { // NOTE: `snapshot_watching_compressor` is what filter out the last snapshot #[test] fn compress_snapshot_test() -> ResultTest<()> { - let stdb = TestDB::in_memory()?; + let stdb = TestDB::durable()?; let mut tx = begin_mut_tx(&stdb); let schema = my_table(AlgebraicType::I32); @@ -3579,7 +3576,7 @@ mod tests { } stdb.commit_tx(tx)?; - let root = stdb.path().snapshots(); + let root = stdb.path().unwrap().snapshots(); let (dir, repo) = make_snapshot(root.clone(), Identity::ZERO, 0, CompressType::None, true); stdb.take_snapshot(&repo)?; @@ -3680,9 +3677,10 @@ mod tests { #[test] fn tries_older_snapshots() -> ResultTest<()> { - let stdb = TestDB::in_memory()?; - stdb.path().snapshots().create()?; - let repo = SnapshotRepository::open(stdb.path().snapshots(), stdb.database_identity(), 85)?; + let stdb = TestDB::durable()?; + let snapshots_path = stdb.path().unwrap().snapshots(); + snapshots_path.create()?; + let repo = SnapshotRepository::open(snapshots_path, stdb.database_identity(), 85)?; stdb.take_snapshot(&repo)?.expect("failed to take snapshot"); { @@ -3898,7 +3896,7 @@ mod tests { })?; // Now we are going to shut it down and reopen it, to ensure that the new table can be // read from disk. - rt.block_on(db.shutdown())?; + rt.block_on(db.shutdown()); drop(db); let (db, _) = TestDB::open_existing_durable( diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index d0a7e2cd4d4..a026af589cd 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -8,6 +8,7 @@ use hex::FromHexError; use spacetimedb_commitlog::repo::TxOffset; use spacetimedb_durability::DurabilityExited; use spacetimedb_expr::errors::TypingError; +use spacetimedb_fs_utils::lockfile::advisory::LockError; use spacetimedb_lib::Identity; use spacetimedb_schema::error::ValidationErrors; use spacetimedb_snapshot::SnapshotError; @@ -81,10 +82,16 @@ pub enum PlanError { pub enum DatabaseError { #[error("Replica not found: {0}")] NotFound(u64), - #[error("Database is already opened. Path:`{0}`. Error:{1}")] + #[error("Database is already opened. Path: `{0}`. Error: {1}")] DatabasedOpened(PathBuf, anyhow::Error), } +impl From for DatabaseError { + fn from(LockError { path, source }: LockError) -> Self { + Self::DatabasedOpened(path, source.into()) + } +} + #[derive(Error, Debug, EnumAsInner)] pub enum DBError { #[error("LibError: {0}")] @@ -194,6 +201,17 @@ impl<'a, T: ?Sized + 'a> From>> for DBE } } +impl From for DBError { + fn from(e: spacetimedb_durability::local::OpenError) -> Self { + use spacetimedb_durability::local::OpenError::*; + + match e { + Lock(e) => Self::from(DatabaseError::from(e)), + Commitlog(e) => Self::Other(e.into()), + } + } +} + #[derive(Debug, Error)] pub enum LogReplayError { #[error( diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 6621678eee7..7508ca63765 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -503,7 +503,7 @@ impl HostController { info!("exiting replica {} of database {}", replica_id, info.database_identity); module.exit().await; let db = &module.replica_ctx().relational_db; - db.shutdown().await?; + db.shutdown().await; let table_names = info.module_def.tables().map(|t| t.name.deref()); remove_database_gauges(&info.database_identity, table_names); info!("replica {} of database {} exited", replica_id, info.database_identity); @@ -841,7 +841,6 @@ impl Host { let (db, connected_clients) = match config.storage { db::Storage::Memory => RelationalDB::open( - &replica_dir, database.database_identity, database.owner_identity, EmptyHistory::new(), @@ -850,24 +849,17 @@ impl Host { page_pool.clone(), )?, db::Storage::Disk => { - // Open a read-only copy of the local durability to replay from. - let (history, _) = relational_db::local_durability( - replica_dir.commit_log(), - // No need to include a snapshot request channel here, 'cause we're only reading from this instance. - None, - ) - .await?; + // Replay from the local state. + let history = relational_db::local_history(&replica_dir).await?; let persistence = persistence.persistence(&database, replica_id).await?; // Loading a database from persistent storage involves heavy // blocking I/O. `asyncify` to avoid blocking the async worker. let (db, clients) = asyncify({ - let replica_dir = replica_dir.clone(); let database_identity = database.database_identity; let owner_identity = database.owner_identity; let page_pool = page_pool.clone(); move || { RelationalDB::open( - &replica_dir, database_identity, owner_identity, history, @@ -991,7 +983,6 @@ impl Host { let phony_replica_dir = ReplicaDir::from_path_unchecked(phony_replica_dir.path().to_owned()); let (db, _connected_clients) = RelationalDB::open( - &phony_replica_dir, database.database_identity, database.owner_identity, EmptyHistory::new(), diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index f12a007985d..739721f75a6 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1255,7 +1255,7 @@ mod tests { ClientActorId, ClientConfig, ClientConnectionReceiver, ClientConnectionSender, ClientName, Protocol, }; use crate::db::relational_db::tests_utils::{ - begin_mut_tx, begin_tx, insert, with_auto_commit, with_read_only, TempReplicaDir, TestDB, + begin_mut_tx, begin_tx, insert, with_auto_commit, with_read_only, TestDB, }; use crate::db::relational_db::{Persistence, RelationalDB, Txdata}; use crate::error::DBError; @@ -1267,6 +1267,7 @@ mod tests { use crate::subscription::query::compile_read_only_query; use crate::subscription::TableUpdateType; use core::fmt; + use futures::FutureExt; use itertools::Itertools; use pretty_assertions::assert_matches; use spacetimedb_client_api_messages::energy::EnergyQuanta; @@ -1373,6 +1374,24 @@ mod tests { fn durable_tx_offset(&self) -> spacetimedb_durability::DurableOffset { self.durable_offset.subscribe().into() } + + fn close(&self) -> spacetimedb_durability::Close { + let mut durable = self.durable_offset.subscribe(); + let commitlog = self.commitlog.clone(); + async move { + let durable_offset = durable + .wait_for( + |offset| match offset.zip(commitlog.read().unwrap().max_committed_offset()) { + Some((durable_offset, committed_offset)) => durable_offset >= committed_offset, + None => false, + }, + ) + .await + .unwrap(); + *durable_offset + } + .boxed() + } } impl Default for ManualDurability { @@ -1397,10 +1416,8 @@ mod tests { fn relational_db_with_manual_durability( rt: tokio::runtime::Handle, ) -> anyhow::Result<(Arc, Arc)> { - let dir = TempReplicaDir::new()?; let durability = Arc::new(ManualDurability::default()); let db = TestDB::open_db( - &dir, EmptyHistory::new(), Some(Persistence { durability: durability.clone(), diff --git a/crates/durability/Cargo.toml b/crates/durability/Cargo.toml index 13d20209e56..fbc2eaa9fae 100644 --- a/crates/durability/Cargo.toml +++ b/crates/durability/Cargo.toml @@ -13,10 +13,12 @@ fallocate = ["spacetimedb-commitlog/fallocate"] [dependencies] anyhow.workspace = true +futures.workspace = true itertools.workspace = true log.workspace = true scopeguard.workspace = true spacetimedb-commitlog.workspace = true +spacetimedb-fs-utils.workspace = true spacetimedb-paths.workspace = true spacetimedb-sats.workspace = true thiserror.workspace = true diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index 6f8d319c48d..e3c2902fa5f 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -4,25 +4,26 @@ use std::{ panic, sync::{ atomic::{AtomicU64, Ordering::Relaxed}, - Arc, Weak, + Arc, }, time::Duration, }; -use anyhow::Context as _; +use futures::{FutureExt as _, TryFutureExt as _}; use itertools::Itertools as _; use log::{info, trace, warn}; -use scopeguard::defer; use spacetimedb_commitlog::{error, payload::Txdata, Commit, Commitlog, Decoder, Encode, Transaction}; -use spacetimedb_paths::server::CommitLogDir; +use spacetimedb_fs_utils::lockfile::advisory::{LockError, LockedFile}; +use spacetimedb_paths::server::ReplicaDir; +use thiserror::Error; use tokio::{ - sync::{mpsc, watch}, - task::{spawn_blocking, AbortHandle, JoinHandle}, + sync::{futures::OwnedNotified, mpsc, oneshot, watch, Notify}, + task::spawn_blocking, time::{interval, MissedTickBehavior}, }; -use tracing::instrument; +use tracing::{instrument, Span}; -use crate::{Durability, DurableOffset, History, TxOffset}; +use crate::{Close, Durability, DurableOffset, History, TxOffset}; pub use spacetimedb_commitlog::repo::{OnNewSegmentFn, SizeOnDisk}; @@ -46,6 +47,16 @@ impl Default for Options { } } +#[derive(Debug, Error)] +pub enum OpenError { + #[error("commitlog directory is locked")] + Lock(#[from] LockError), + #[error("failed to open commitlog")] + Commitlog(#[from] io::Error), +} + +type ShutdownReply = oneshot::Sender; + /// [`Durability`] implementation backed by a [`Commitlog`] on local storage. /// /// The commitlog is constrained to store the canonical [`Txdata`] payload, @@ -73,48 +84,55 @@ pub struct Local { /// This is mainly for observability purposes, and can thus be updated with /// relaxed memory ordering. queue_depth: Arc, - /// Handle to the [`PersisterTask`], allowing to drain the `queue` when - /// explicitly dropped via [`Self::close`]. - persister_task: JoinHandle<()>, + /// Channel to request the actor to exit. + shutdown: mpsc::Sender, } impl Local { - /// Create a [`Local`] instance at the `root` directory. + /// Create a [`Local`] instance at the `replica_dir`. /// - /// The `root` directory must already exist. + /// `replica_dir` must already exist. /// /// Background tasks are spawned onto the provided tokio runtime. /// /// We will send a message down the `on_new_segment` channel whenever we begin a new commitlog segment. /// This is used to capture a snapshot each new segment. pub fn open( - root: CommitLogDir, + replica_dir: ReplicaDir, rt: tokio::runtime::Handle, opts: Options, on_new_segment: Option>, - ) -> io::Result { + ) -> Result { info!("open local durability"); - let clog = Arc::new(Commitlog::open(root, opts.commitlog, on_new_segment)?); - let (queue, rx) = mpsc::unbounded_channel(); + // For backwards-compatibility, we keep using the `db.lock` file. + let lock_path = replica_dir.0.join("db.lock"); + let lockfile = LockedFile::lock(lock_path)?; + + let clog = Arc::new(Commitlog::open( + replica_dir.commit_log(), + opts.commitlog, + on_new_segment, + )?); + let (queue, txdata_rx) = mpsc::unbounded_channel(); let queue_depth = Arc::new(AtomicU64::new(0)); let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset()); + let (shutdown_tx, shutdown_rx) = mpsc::channel(1); - let persister_task = rt.spawn( - PersisterTask { + rt.spawn( + Actor { clog: clog.clone(), - rx, + + txdata_rx, + shutdown_rx, + + durable_offset: durable_tx, queue_depth: queue_depth.clone(), + + sync_interval: opts.sync_interval, max_records_in_commit: opts.commitlog.max_records_in_commit, - } - .run(), - ); - rt.spawn( - FlushAndSyncTask { - clog: Arc::downgrade(&clog), - period: opts.sync_interval, - offset: durable_tx, - persister_task: persister_task.abort_handle(), + + lockfile, } .run(), ); @@ -123,11 +141,18 @@ impl Local { clog, durable_offset: durable_rx, queue, + shutdown: shutdown_tx, queue_depth, - persister_task, }) } + /// Obtain a read-only copy of the durable state that implements [History]. + pub fn as_history(&self) -> impl History> { + self.clog.clone() + } +} + +impl Local { /// Inspect how many transactions added via [`Self::append_tx`] are pending /// to be applied to the underlying [`Commitlog`]. pub fn queue_depth(&self) -> u64 { @@ -149,91 +174,134 @@ impl Local { self.clog.compress_segments(offsets) } - /// Apply all outstanding transactions to the [`Commitlog`] and flush it - /// to disk. - /// - /// Returns the durable [`TxOffset`], if any. - pub async fn close(self) -> anyhow::Result> { - info!("close local durability"); - - drop(self.queue); - if let Err(e) = self.persister_task.await { - if e.is_panic() { - return Err(e).context("persister task panicked"); - } - } - - spawn_blocking(move || self.clog.flush_and_sync()) - .await? - .context("failed to sync commitlog") - } - /// Get the size on disk of the underlying [`Commitlog`]. pub fn size_on_disk(&self) -> io::Result { self.clog.size_on_disk() } } -struct PersisterTask { +struct Actor { clog: Arc>>, - rx: mpsc::UnboundedReceiver>, + + txdata_rx: mpsc::UnboundedReceiver>, + shutdown_rx: mpsc::Receiver>, + + durable_offset: watch::Sender>, queue_depth: Arc, + + sync_interval: Duration, max_records_in_commit: NonZeroU16, + + #[allow(unused)] + lockfile: LockedFile, } -impl PersisterTask { - #[instrument(name = "durability::local::persister_task", skip_all)] +impl Actor { + #[instrument(name = "durability::local::actor", skip_all)] async fn run(mut self) { - info!("starting persister task"); - - while let Some(txdata) = self.rx.recv().await { - self.queue_depth.fetch_sub(1, Relaxed); - trace!("received txdata"); - - // If we are writing one commit per tx, trying to buffer is - // fairly pointless. Immediately flush instead. - // - // Otherwise, try `Commitlog::append` as a fast-path which doesn't - // require `spawn_blocking`. - if self.max_records_in_commit.get() == 1 { - self.flush_append(txdata, true).await; - } else if let Err(retry) = self.clog.append(txdata) { - self.flush_append(retry, false).await + info!("starting durability actor"); + + // Always notify waiters (i.e. [Close] futures) when the actor exits. + let done = scopeguard::guard(Arc::new(Notify::new()), |done| done.notify_waiters()); + + let mut sync_interval = interval(self.sync_interval); + sync_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + // `flush_and_sync` when the loop exits without panicking, + // or `flush_and_sync` inside the loop failed. + let mut sync_on_exit = true; + + loop { + tokio::select! { + biased; + + Some(reply) = self.shutdown_rx.recv() => { + self.txdata_rx.close(); + let _ = reply.send(done.clone().notified_owned()); + }, + + data = self.txdata_rx.recv() => { + let Some(txdata) = data else { + break; + }; + self.queue_depth.fetch_sub(1, Relaxed); + // If we are writing one commit per tx, trying to buffer is + // fairly pointless. Immediately flush instead. + // + // Otherwise, try `Commitlog::append` as a fast-path + // that doesn't require `spawn_blocking`. + if self.max_records_in_commit.get() == 1 { + self.flush_append(txdata, true).await; + } else if let Err(retry) = self.clog.append(txdata) { + self.flush_append(retry, false).await + } + }, + + _ = sync_interval.tick() => { + if self.flush_and_sync().await.is_err() { + sync_on_exit = false; + break; + } + } } + } - trace!("appended txdata"); + if sync_on_exit { + let _ = self.flush_and_sync().await; } - info!("exiting persister task"); + info!("exiting durability actor"); } #[instrument(skip_all)] async fn flush_append(&self, txdata: Txdata, flush_after: bool) { let clog = self.clog.clone(); - let task = spawn_blocking(move || { + let span = Span::current(); + spawn_blocking(move || { + let _span = span.enter(); let mut retry = Some(txdata); while let Some(txdata) = retry.take() { if let Err(error::Append { txdata, source }) = clog.append_maybe_flush(txdata) { - flush_error("persister", source); + flush_error("append-maybe-flush", &source); retry = Some(txdata); } } if flush_after { - clog.flush().map(drop).unwrap_or_else(|e| flush_error("persister", e)); + clog.flush() + .map(drop) + .unwrap_or_else(|e| flush_error("flush-after", &e)); } - - trace!("flush-append succeeded"); }) - .await; - if let Err(e) = task { - // Resume panic on the spawned task, - // which will drop the channel receiver, - // which will cause `append_tx` to panic. - if e.is_panic() { - panic::resume_unwind(e.into_panic()) + .await + .expect("commitlog append blocking task panicked") + } + + #[instrument(skip_all)] + async fn flush_and_sync(&self) -> io::Result> { + // Skip if nothing changed. + if let Some((committed, durable)) = self.clog.max_committed_offset().zip(*self.durable_offset.borrow()) { + if committed == durable { + return Ok(None); } } + + let clog = self.clog.clone(); + let span = Span::current(); + spawn_blocking(move || { + let _span = span.enter(); + clog.flush_and_sync() + }) + .await + .expect("commitlog flush-and-sync blocking task panicked") + .inspect_err(|e| flush_error("flush-and-sync", e)) + .inspect(|maybe_offset| { + if let Some(new_offset) = maybe_offset { + trace!("synced to offset {new_offset}"); + self.durable_offset.send_modify(|val| { + val.replace(*new_offset); + }); + } + }) } } @@ -242,71 +310,13 @@ impl PersisterTask { /// Panics if the error indicates that the log may be permanently unwritable. #[inline] #[track_caller] -fn flush_error(task: &str, e: io::Error) { +fn flush_error(task: &str, e: &io::Error) { warn!("error flushing commitlog ({task}): {e:?}"); if matches!(e.kind(), io::ErrorKind::AlreadyExists | io::ErrorKind::StorageFull) { panic!("{e}"); } } -struct FlushAndSyncTask { - clog: Weak>>, - period: Duration, - offset: watch::Sender>, - /// Handle to abort the [`PersisterTask`] if fsync panics. - persister_task: AbortHandle, -} - -impl FlushAndSyncTask { - #[instrument(name = "durability::local::flush_and_sync_task", skip_all)] - async fn run(self) { - info!("starting syncer task"); - - let mut interval = interval(self.period); - interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - - defer!(self.persister_task.abort()); - - loop { - interval.tick().await; - - let Some(clog) = self.clog.upgrade() else { - break; - }; - // Skip if nothing changed. - if let Some(committed) = clog.max_committed_offset() { - if self.offset.borrow().is_some_and(|durable| durable == committed) { - continue; - } - } - - let task = spawn_blocking(move || clog.flush_and_sync()).await; - match task { - Err(e) => { - if e.is_panic() { - panic::resume_unwind(e.into_panic()); - } - break; - } - Ok(Err(e)) => { - flush_error("flush-and-sync", e); - break; - } - Ok(Ok(Some(new_offset))) => { - trace!("synced to offset {new_offset}"); - self.offset.send_modify(|val| { - val.replace(new_offset); - }); - } - // No data to flush. - Ok(Ok(None)) => {} - } - } - - info!("exiting syncer task"); - } -} - impl Durability for Local { type TxData = Txdata; @@ -320,9 +330,33 @@ impl Durability for Local { fn durable_tx_offset(&self) -> DurableOffset { self.durable_offset.clone().into() } + + fn close(&self) -> Close { + info!("close local durability"); + + let durable_offset = self.durable_tx_offset(); + let shutdown = self.shutdown.clone(); + + async move { + let (done_tx, done_rx) = oneshot::channel(); + // Ignore channel errors - those just mean the actor is already gone. + let _ = shutdown + .send(done_tx) + .map_err(drop) + .and_then(|()| done_rx.map_err(drop)) + .and_then(|done| async move { + done.await; + Ok(()) + }) + .await; + + durable_offset.last_seen() + } + .boxed() + } } -impl History for Local { +impl History for Commitlog> { type TxData = Txdata; fn fold_transactions_from(&self, offset: TxOffset, decoder: D) -> Result<(), D::Error> @@ -330,7 +364,7 @@ impl History for Local { D: Decoder, D::Error: From, { - self.clog.fold_transactions_from(offset, decoder) + self.fold_transactions_from(offset, decoder) } fn transactions_from<'a, D>( @@ -343,12 +377,12 @@ impl History for Local { D::Error: From, Self::TxData: 'a, { - self.clog.transactions_from(offset, decoder) + self.transactions_from(offset, decoder) } fn tx_range_hint(&self) -> (TxOffset, Option) { - let min = self.clog.min_committed_offset().unwrap_or_default(); - let max = self.clog.max_committed_offset(); + let min = self.min_committed_offset().unwrap_or_default(); + let max = self.max_committed_offset(); (min, max) } diff --git a/crates/durability/src/imp/mod.rs b/crates/durability/src/imp/mod.rs index 38c450ab141..6f83106d0b3 100644 --- a/crates/durability/src/imp/mod.rs +++ b/crates/durability/src/imp/mod.rs @@ -6,11 +6,16 @@ pub use testing::NoDurability; #[cfg(any(test, feature = "test"))] mod testing { - use std::marker::PhantomData; + use std::{ + future, + marker::PhantomData, + sync::atomic::{AtomicBool, Ordering}, + }; + use futures::FutureExt as _; use tokio::sync::watch; - use crate::{Durability, DurableOffset, TxOffset}; + use crate::{Close, Durability, DurableOffset, TxOffset}; /// A [`Durability`] impl that sends all transactions into the void. /// @@ -18,6 +23,7 @@ mod testing { /// the `test` feature is enabled. pub struct NoDurability { durable_offset: watch::Sender>, + closed: AtomicBool, _txdata: PhantomData, } @@ -26,6 +32,7 @@ mod testing { let (durable_offset, _) = watch::channel(None); Self { durable_offset, + closed: AtomicBool::new(false), _txdata: PhantomData, } } @@ -34,9 +41,19 @@ mod testing { impl Durability for NoDurability { type TxData = T; - fn append_tx(&self, _: Self::TxData) {} + fn append_tx(&self, _: Self::TxData) { + if self.closed.load(Ordering::Relaxed) { + panic!("`close` was called on this `NoDurability` instance"); + } + } + fn durable_tx_offset(&self) -> DurableOffset { self.durable_offset.subscribe().into() } + + fn close(&self) -> Close { + self.closed.store(true, Ordering::Relaxed); + future::ready(*self.durable_offset.borrow()).boxed() + } } } diff --git a/crates/durability/src/lib.rs b/crates/durability/src/lib.rs index e387cc921c5..25bd41b1c6b 100644 --- a/crates/durability/src/lib.rs +++ b/crates/durability/src/lib.rs @@ -1,5 +1,6 @@ use std::{iter, marker::PhantomData, sync::Arc}; +use futures::future::BoxFuture; use thiserror::Error; use tokio::sync::watch; @@ -30,6 +31,7 @@ pub struct DurabilityExited; /// The handle is valid for as long as the [`Durability`] instance it was /// obtained from is live, i.e. able to persist transactions. When the instance /// shuts down or crashes, methods will return errors of type [`DurabilityExited`]. +#[derive(Clone)] pub struct DurableOffset { // TODO: `watch::Receiver::wait_for` will hold a shared lock until all // subscribers have seen the current value. Although it may skip entries, @@ -83,6 +85,12 @@ impl From>> for DurableOffset { } } +/// Future created by [Durability::close]. +/// +/// This is a boxed future rather than an associated type, so that [Durability] +/// can be used as a trait object without knowing the type of the `close` future. +pub type Close = BoxFuture<'static, Option>; + /// The durability API. /// /// NOTE: This is a preliminary definition, still under consideration. @@ -110,6 +118,24 @@ pub trait Durability: Send + Sync { /// either because nothing has been persisted yet, or because the status /// cannot be retrieved. fn durable_tx_offset(&self) -> DurableOffset; + + /// Asynchronously request the durability to shut down, without dropping it. + /// + /// Shall close any internal channels, such that it is no longer possible to + /// append new data (i.e. [Durability::append_tx] shall panic). + /// Then, drains the internal queues and attempts to make the remaining data + /// durable. Resolves to the durable [TxOffset]. + /// + /// When the returned future resolves, calls to [Durability::append_tx] must + /// panic, and calling [DurableOffset::last_seen] must return the same value + /// as the future's output. + /// + /// Repeatedly calling `close` on an already closed [Durability] shall + /// return the same [TxOffset]. + /// + /// Note that errors are not propagated, as the [Durability] may already be + /// closed. + fn close(&self) -> Close; } /// Access to the durable history. diff --git a/crates/fs-utils/Cargo.toml b/crates/fs-utils/Cargo.toml index 3846f579426..56ebd4f8c32 100644 --- a/crates/fs-utils/Cargo.toml +++ b/crates/fs-utils/Cargo.toml @@ -8,9 +8,10 @@ description = "Assorted utilities for filesystem operations used in SpacetimeDB" [dependencies] anyhow.workspace = true -thiserror.workspace = true +fs2.workspace = true hex.workspace = true rand.workspace = true +thiserror.workspace = true tokio.workspace = true zstd-framed.workspace = true diff --git a/crates/fs-utils/src/lockfile.rs b/crates/fs-utils/src/lockfile.rs index 0f0fea243ae..725f52d5886 100644 --- a/crates/fs-utils/src/lockfile.rs +++ b/crates/fs-utils/src/lockfile.rs @@ -89,3 +89,85 @@ impl Drop for Lockfile { Self::release_internal(&self.path).unwrap(); } } + +pub mod advisory { + use std::{ + fmt, + fs::{self, File}, + io, + path::{Path, PathBuf}, + }; + + use fs2::FileExt as _; + use thiserror::Error; + + use crate::create_parent_dir; + + #[derive(Debug, Error)] + #[error("failed to lock {}", path.display())] + pub struct LockError { + pub path: PathBuf, + #[source] + pub source: io::Error, + } + + /// A file locked with an exclusive, filesystem-level lock. + /// + /// Uses [`flock(2)`] on Unix platforms, and [`LockFile`] on Windows systems. + /// + /// The file is created if it does not exist. + /// Dropping `Lockfile` releases the lock, but, unlike [super::Lockfile], + /// does not delete the file. + /// + /// [`flock(2)`]: https://man7.org/linux/man-pages/man2/flock.2.html + /// [`LockFile`]: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-lockfile?redirectedfrom=MSDN + pub struct LockedFile { + path: PathBuf, + #[allow(unused)] + lock: File, + } + + impl LockedFile { + /// Attempt to lock `path` using an exclusive lock. + /// + /// The file will be created, + /// including its parent directories, + /// if it does not exist. + /// + /// Note that, unlike [super::Lockfile::for_file], + /// the exact `path` is used -- no extra adjacent `.lock` file is + /// created. + pub fn lock(path: impl AsRef) -> Result { + let path = path.as_ref(); + Self::lock_inner(path).map_err(|source| LockError { + path: path.into(), + source, + }) + } + + fn lock_inner(path: &Path) -> io::Result { + create_parent_dir(path)?; + let lock = File::create(path)?; + lock.try_lock_exclusive()?; + + Ok(Self { + path: path.to_path_buf(), + lock, + }) + } + + /// Release the lock and optionally remove the locked file. + pub fn release(self, remove: bool) -> io::Result<()> { + if remove { + fs::remove_file(&self.path)?; + } + Ok(()) + } + } + + impl fmt::Debug for LockedFile { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("LockedFile").field("path", &self.path).finish() + } + } +} diff --git a/crates/snapshot/tests/remote.rs b/crates/snapshot/tests/remote.rs index 615c92cbb8b..b1fe13c1116 100644 --- a/crates/snapshot/tests/remote.rs +++ b/crates/snapshot/tests/remote.rs @@ -6,10 +6,7 @@ use pretty_assertions::assert_matches; use rand::seq::IndexedRandom as _; use spacetimedb::{ db::{ - relational_db::{ - tests_utils::{TempReplicaDir, TestDB}, - Persistence, SNAPSHOT_FREQUENCY, - }, + relational_db::{tests_utils::TestDB, Persistence, SNAPSHOT_FREQUENCY}, snapshot::{self, SnapshotWorker}, }, error::DBError, @@ -234,15 +231,13 @@ async fn create_snapshot(repo: Arc) -> anyhow::Result::default())), snapshots: Some(SnapshotWorker::new(repo, snapshot::Compression::Disabled)), runtime: rt, }; - let db = TestDB::open_db(&tmp, EmptyHistory::new(), Some(persistence), None, 0)?; + let db = TestDB::open_db(EmptyHistory::new(), Some(persistence), None, 0)?; let watch = db.subscribe_to_snapshots().unwrap(); let table_id = db.with_auto_commit(Workload::Internal, |tx| {