Skip to content

Commit 9259984

Browse files
committed
pass visible seqno watermark into Tree
1 parent 6a8492b commit 9259984

File tree

101 files changed

+774
-328
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

101 files changed

+774
-328
lines changed

src/blob_tree/ingest.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ impl<'a> BlobIngestion<'a> {
136136
///
137137
/// Will return `Err` if an IO error occurs.
138138
#[allow(clippy::significant_drop_tightening)]
139-
pub fn finish(self) -> crate::Result<SeqNo> {
139+
pub fn finish(self) -> crate::Result<()> {
140140
use crate::AbstractTree;
141141

142142
let index = self.index().clone();
@@ -236,6 +236,7 @@ impl<'a> BlobIngestion<'a> {
236236
Ok(copy)
237237
},
238238
global_seqno,
239+
&self.tree.index.config.visible_seqno,
239240
)?;
240241

241242
// Perform maintenance on the version history (e.g., clean up old versions).
@@ -244,7 +245,7 @@ impl<'a> BlobIngestion<'a> {
244245
log::warn!("Version GC failed: {e:?}");
245246
}
246247

247-
Ok(global_seqno)
248+
Ok(())
248249
}
249250

250251
#[inline]

src/compaction/fifo.rs

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,12 @@ mod tests {
161161
#[test]
162162
fn fifo_empty_levels() -> crate::Result<()> {
163163
let dir = tempfile::tempdir()?;
164-
let tree = Config::new(dir.path(), SequenceNumberCounter::default()).open()?;
164+
let tree = Config::new(
165+
dir.path(),
166+
SequenceNumberCounter::default(),
167+
SequenceNumberCounter::default(),
168+
)
169+
.open()?;
165170

166171
let fifo = Arc::new(Strategy::new(1, None));
167172
tree.compact(fifo, 0)?;
@@ -173,7 +178,12 @@ mod tests {
173178
#[test]
174179
fn fifo_below_limit() -> crate::Result<()> {
175180
let dir = tempfile::tempdir()?;
176-
let tree = Config::new(dir.path(), SequenceNumberCounter::default()).open()?;
181+
let tree = Config::new(
182+
dir.path(),
183+
SequenceNumberCounter::default(),
184+
SequenceNumberCounter::default(),
185+
)
186+
.open()?;
177187

178188
for i in 0..4u8 {
179189
tree.insert([b'k', i].as_slice(), "v", u64::from(i));
@@ -191,7 +201,12 @@ mod tests {
191201
#[test]
192202
fn fifo_more_than_limit() -> crate::Result<()> {
193203
let dir = tempfile::tempdir()?;
194-
let tree = Config::new(dir.path(), SequenceNumberCounter::default()).open()?;
204+
let tree = Config::new(
205+
dir.path(),
206+
SequenceNumberCounter::default(),
207+
SequenceNumberCounter::default(),
208+
)
209+
.open()?;
195210

196211
for i in 0..4u8 {
197212
tree.insert([b'k', i].as_slice(), "v", u64::from(i));
@@ -210,9 +225,13 @@ mod tests {
210225
#[test]
211226
fn fifo_more_than_limit_blobs() -> crate::Result<()> {
212227
let dir = tempfile::tempdir()?;
213-
let tree = Config::new(dir.path(), SequenceNumberCounter::default())
214-
.with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1)))
215-
.open()?;
228+
let tree = Config::new(
229+
dir.path(),
230+
SequenceNumberCounter::default(),
231+
SequenceNumberCounter::default(),
232+
)
233+
.with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1)))
234+
.open()?;
216235

217236
for i in 0..3u8 {
218237
tree.insert([b'k', i].as_slice(), "$", u64::from(i));
@@ -230,7 +249,12 @@ mod tests {
230249
#[test]
231250
fn fifo_ttl() -> crate::Result<()> {
232251
let dir = tempfile::tempdir()?;
233-
let tree = Config::new(dir.path(), SequenceNumberCounter::default()).open()?;
252+
let tree = Config::new(
253+
dir.path(),
254+
SequenceNumberCounter::default(),
255+
SequenceNumberCounter::default(),
256+
)
257+
.open()?;
234258

235259
// Freeze time and create first (older) table at t=1000s
236260
crate::time::set_unix_timestamp_for_test(Some(std::time::Duration::from_secs(1_000)));
@@ -260,9 +284,13 @@ mod tests {
260284
#[test]
261285
fn fifo_ttl_then_limit_additional_drops_blob_unit() -> crate::Result<()> {
262286
let dir = tempfile::tempdir()?;
263-
let tree = Config::new(dir.path(), SequenceNumberCounter::default())
264-
.with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1)))
265-
.open()?;
287+
let tree = Config::new(
288+
dir.path(),
289+
SequenceNumberCounter::default(),
290+
SequenceNumberCounter::default(),
291+
)
292+
.with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1)))
293+
.open()?;
266294

267295
// Create two tables; we will expire them via time override and force additional drops via limit.
268296
tree.insert("a", "$", 0);

src/compaction/flavour.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ impl CompactionFlavour for RelocatingCompaction {
296296
Ok(copy)
297297
},
298298
&opts.global_seqno,
299+
&opts.visible_seqno,
299300
)?;
300301

301302
// NOTE: If the application were to crash >here< it's fine
@@ -424,6 +425,7 @@ impl CompactionFlavour for StandardCompaction {
424425
Ok(copy)
425426
},
426427
&opts.global_seqno,
428+
&opts.visible_seqno,
427429
)?;
428430

429431
// NOTE: If the application were to crash >here< it's fine

src/compaction/state/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,16 @@ mod tests {
3232
use test_log::test;
3333

3434
#[test]
35-
#[ignore]
35+
#[ignore = "wip"]
3636
fn level_manifest_atomicity() -> crate::Result<()> {
3737
let folder = tempfile::tempdir()?;
3838

39-
let tree = crate::Config::new(folder, SequenceNumberCounter::default()).open()?;
39+
let tree = crate::Config::new(
40+
folder,
41+
SequenceNumberCounter::default(),
42+
SequenceNumberCounter::default(),
43+
)
44+
.open()?;
4045

4146
tree.insert("a", "a", 0);
4247
tree.flush_active_memtable(0)?;

src/compaction/worker.rs

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ pub struct Options {
3636

3737
pub global_seqno: SequenceNumberCounter,
3838

39+
pub visible_seqno: SequenceNumberCounter,
40+
3941
pub table_id_generator: SequenceNumberCounter,
4042

4143
pub blob_file_id_generator: SequenceNumberCounter,
@@ -65,6 +67,7 @@ impl Options {
6567
pub fn from_tree(tree: &crate::Tree, strategy: Arc<dyn CompactionStrategy>) -> Self {
6668
Self {
6769
global_seqno: tree.config.seqno.clone(),
70+
visible_seqno: tree.config.visible_seqno.clone(),
6871
tree_id: tree.id,
6972
table_id_generator: tree.table_id_counter.clone(),
7073
blob_file_id_generator: tree.blob_file_id_counter.clone(),
@@ -222,6 +225,7 @@ fn move_tables(
222225
Ok(copy)
223226
},
224227
&opts.global_seqno,
228+
&opts.visible_seqno,
225229
)?;
226230

227231
if let Err(e) = version_history_lock.maintenance(&opts.config.path, opts.mvcc_gc_watermark) {
@@ -569,6 +573,7 @@ fn drop_tables(
569573
Ok(copy)
570574
},
571575
&opts.global_seqno,
576+
&opts.visible_seqno,
572577
)?;
573578

574579
if let Err(e) = version_history_lock.maintenance(&opts.config.path, opts.mvcc_gc_watermark) {
@@ -611,7 +616,12 @@ mod tests {
611616
fn compaction_drop_tables() -> crate::Result<()> {
612617
let folder = tempfile::tempdir()?;
613618

614-
let tree = crate::Config::new(folder, SequenceNumberCounter::default()).open()?;
619+
let tree = crate::Config::new(
620+
folder,
621+
SequenceNumberCounter::default(),
622+
SequenceNumberCounter::default(),
623+
)
624+
.open()?;
615625

616626
tree.insert("a", "a", 0);
617627
tree.flush_active_memtable(0)?;
@@ -656,16 +666,20 @@ mod tests {
656666

657667
let folder = tempfile::tempdir()?;
658668

659-
let tree = crate::Config::new(folder, SequenceNumberCounter::default())
660-
.data_block_size_policy(BlockSizePolicy::all(1))
661-
.with_kv_separation(Some(
662-
KvSeparationOptions::default()
663-
.separation_threshold(1)
664-
.age_cutoff(1.0)
665-
.staleness_threshold(0.01)
666-
.compression(crate::CompressionType::None),
667-
))
668-
.open()?;
669+
let tree = crate::Config::new(
670+
folder,
671+
SequenceNumberCounter::default(),
672+
SequenceNumberCounter::default(),
673+
)
674+
.data_block_size_policy(BlockSizePolicy::all(1))
675+
.with_kv_separation(Some(
676+
KvSeparationOptions::default()
677+
.separation_threshold(1)
678+
.age_cutoff(1.0)
679+
.staleness_threshold(0.01)
680+
.compression(crate::CompressionType::None),
681+
))
682+
.open()?;
669683

670684
tree.insert("a", "a", 0);
671685
tree.insert("b", "b", 0);

src/config/mod.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,8 @@ pub struct Config {
234234
///
235235
/// Should be shared between multple trees of a database
236236
pub(crate) seqno: SequenceNumberCounter,
237+
238+
pub(crate) visible_seqno: SequenceNumberCounter,
237239
}
238240

239241
// TODO: remove default?
@@ -243,6 +245,7 @@ impl Default for Config {
243245
path: absolute_path(Path::new(DEFAULT_FILE_FOLDER)),
244246
descriptor_table: Arc::new(DescriptorTable::new(256)),
245247
seqno: SequenceNumberCounter::default(),
248+
visible_seqno: SequenceNumberCounter::default(),
246249

247250
cache: Arc::new(Cache::with_capacity_bytes(
248251
/* 16 MiB */ 16 * 1_024 * 1_024,
@@ -293,10 +296,15 @@ impl Default for Config {
293296

294297
impl Config {
295298
/// Initializes a new config
296-
pub fn new<P: AsRef<Path>>(path: P, seqno: SequenceNumberCounter) -> Self {
299+
pub fn new<P: AsRef<Path>>(
300+
path: P,
301+
seqno: SequenceNumberCounter,
302+
visible_seqno: SequenceNumberCounter,
303+
) -> Self {
297304
Self {
298305
path: absolute_path(path.as_ref()),
299306
seqno,
307+
visible_seqno,
300308
..Default::default()
301309
}
302310
}

src/ingestion.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ impl AnyIngestion<'_> {
5353
/// # Errors
5454
///
5555
/// Will return `Err` if an IO error occurs.
56-
pub fn finish(self) -> crate::Result<SeqNo> {
56+
pub fn finish(self) -> crate::Result<()> {
5757
match self {
5858
Self::Standard(i) => i.finish(),
5959
Self::Blob(b) => b.finish(),

src/run_reader.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,12 @@ mod tests {
130130
#[test]
131131
fn run_reader_skip() -> crate::Result<()> {
132132
let tempdir = tempfile::tempdir()?;
133-
let tree = crate::Config::new(&tempdir, SequenceNumberCounter::default()).open()?;
133+
let tree = crate::Config::new(
134+
&tempdir,
135+
SequenceNumberCounter::default(),
136+
SequenceNumberCounter::default(),
137+
)
138+
.open()?;
134139

135140
let ids = [
136141
["a", "b", "c"],
@@ -165,7 +170,12 @@ mod tests {
165170
#[expect(clippy::unwrap_used)]
166171
fn run_reader_basic() -> crate::Result<()> {
167172
let tempdir = tempfile::tempdir()?;
168-
let tree = crate::Config::new(&tempdir, SequenceNumberCounter::default()).open()?;
173+
let tree = crate::Config::new(
174+
&tempdir,
175+
SequenceNumberCounter::default(),
176+
SequenceNumberCounter::default(),
177+
)
178+
.open()?;
169179

170180
let ids = [
171181
["a", "b", "c"],

src/run_scanner.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,12 @@ mod tests {
7272
#[test]
7373
fn run_scanner_basic() -> crate::Result<()> {
7474
let tempdir = tempfile::tempdir()?;
75-
let tree = crate::Config::new(&tempdir, SequenceNumberCounter::default()).open()?;
75+
let tree = crate::Config::new(
76+
&tempdir,
77+
SequenceNumberCounter::default(),
78+
SequenceNumberCounter::default(),
79+
)
80+
.open()?;
7681

7782
let ids = [
7883
["a", "b", "c"],

src/seqno.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ impl SequenceNumberCounter {
4848
Self(Arc::new(AtomicU64::new(prev)))
4949
}
5050

51-
/// Gets the next sequence number, without incrementing the counter.
51+
/// Gets the would-be-next sequence number, without incrementing the counter.
5252
///
5353
/// This should only be used when creating a snapshot.
5454
#[must_use]

0 commit comments

Comments
 (0)