From 0a44f6aec96f1ef30f9b05cdeaca5f9511371862 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 17 Oct 2023 16:40:02 +0200 Subject: [PATCH 1/5] bottomless: add xz compression option Transplanted from https://github.com/libsql/sqld/pull/780 --- bottomless/Cargo.toml | 2 +- bottomless/src/backup.rs | 5 ++ bottomless/src/read.rs | 6 +- bottomless/src/replicator.rs | 113 +++++++++++++++++++++++++---------- 4 files changed, 94 insertions(+), 32 deletions(-) diff --git a/bottomless/Cargo.toml b/bottomless/Cargo.toml index 05e1b24890..24ee26cb14 100644 --- a/bottomless/Cargo.toml +++ b/bottomless/Cargo.toml @@ -10,7 +10,7 @@ description = "Bottomless replication for libSQL" [dependencies] anyhow = "1.0.66" -async-compression = { version = "0.3.15", features = ["tokio", "gzip"] } +async-compression = { version = "0.3.15", features = ["tokio", "gzip", "xz"] } aws-config = { version = "0.55" } aws-sdk-s3 = { version = "0.28" } bytes = "1" diff --git a/bottomless/src/backup.rs b/bottomless/src/backup.rs index bb6e908167..f3de86af10 100644 --- a/bottomless/src/backup.rs +++ b/bottomless/src/backup.rs @@ -116,6 +116,11 @@ impl WalCopier { wal.copy_frames(&mut gzip, len).await?; gzip.shutdown().await?; } + CompressionKind::Xz => { + let mut xz = async_compression::tokio::write::XzEncoder::new(&mut out); + wal.copy_frames(&mut xz, len).await?; + xz.shutdown().await?; + } } if tracing::enabled!(tracing::Level::DEBUG) { let elapsed = Instant::now() - period_start; diff --git a/bottomless/src/read.rs b/bottomless/src/read.rs index 1177f60b56..f1837c1926 100644 --- a/bottomless/src/read.rs +++ b/bottomless/src/read.rs @@ -1,7 +1,7 @@ use crate::replicator::CompressionKind; use crate::wal::WalFrameHeader; use anyhow::Result; -use async_compression::tokio::bufread::GzipDecoder; +use async_compression::tokio::bufread::{GzipDecoder, XzEncoder}; use aws_sdk_s3::primitives::ByteStream; use std::io::ErrorKind; use std::pin::Pin; @@ -32,6 +32,10 @@ impl BatchReader { let gzip = GzipDecoder::new(reader); Box::pin(gzip) } + CompressionKind::Xz => { + let xz = XzEncoder::new(reader); + Box::pin(xz) + } }, } } diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index f95bc97389..5ad70a2398 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -5,7 +5,7 @@ use crate::uuid_utils::decode_unix_timestamp; use crate::wal::WalFileReader; use anyhow::{anyhow, bail}; use arc_swap::ArcSwapOption; -use async_compression::tokio::write::GzipEncoder; +use async_compression::tokio::write::{GzipEncoder, XzEncoder}; use aws_sdk_s3::config::{Credentials, Region}; use aws_sdk_s3::error::SdkError; use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder; @@ -171,7 +171,7 @@ impl Options { let secret_access_key = env_var("LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY").ok(); let region = env_var("LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION").ok(); let max_frames_per_batch = - env_var_or("LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES", 500).parse::()?; + env_var_or("LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES", 10000).parse::()?; let s3_upload_max_parallelism = env_var_or("LIBSQL_BOTTOMLESS_S3_PARALLEL_MAX", 32).parse::()?; let restore_transaction_page_swap_after = @@ -653,7 +653,7 @@ impl Replicator { CompressionKind::None => Ok(ByteStream::from_path(db_path).await?), CompressionKind::Gzip => { let mut reader = File::open(db_path).await?; - let gzip_path = Self::db_gzip_path(db_path); + let gzip_path = Self::db_compressed_path(db_path, "gz"); let compressed_file = OpenOptions::new() .create(true) .write(true) @@ -671,13 +671,33 @@ impl Replicator { ); Ok(ByteStream::from_path(gzip_path).await?) } + CompressionKind::Xz => { + let mut reader = File::open(db_path).await?; + let xz_path = Self::db_compressed_path(db_path, "xz"); + let compressed_file = OpenOptions::new() + .create(true) + .write(true) + .read(true) + .truncate(true) + .open(&xz_path) + .await?; + let mut writer = XzEncoder::new(compressed_file); + let size = tokio::io::copy(&mut reader, &mut writer).await?; + writer.shutdown().await?; + tracing::debug!( + "Compressed database file ({} bytes) into `{}`", + size, + xz_path.display() + ); + Ok(ByteStream::from_path(xz_path).await?) + } } } - fn db_gzip_path(db_path: &Path) -> PathBuf { - let mut gzip_path = db_path.to_path_buf(); - gzip_path.pop(); - gzip_path.join("db.gz") + fn db_compressed_path(db_path: &Path, suffix: &'static str) -> PathBuf { + let mut compressed_path: PathBuf = db_path.to_path_buf(); + compressed_path.pop(); + compressed_path.join(format!("db.{suffix}")) } fn restore_db_path(&self) -> PathBuf { @@ -816,9 +836,10 @@ impl Replicator { let _ = snapshot_notifier.send(Ok(Some(generation))); let elapsed = Instant::now() - start; tracing::debug!("Snapshot upload finished (took {:?})", elapsed); - // cleanup gzip database snapshot if exists - let gzip_path = Self::db_gzip_path(&db_path); - let _ = tokio::fs::remove_file(gzip_path).await; + // cleanup gzip/xz database snapshot if exists + for suffix in &["gz", "xz"] { + let _ = tokio::fs::remove_file(Self::db_compressed_path(&db_path, suffix)).await; + } }); let elapsed = Instant::now() - start_ts; tracing::debug!("Scheduled DB snapshot {} (took {:?})", generation, elapsed); @@ -1160,31 +1181,58 @@ impl Replicator { } async fn restore_from_snapshot(&mut self, generation: &Uuid, db: &mut File) -> Result { - let main_db_path = match self.use_compression { - CompressionKind::None => format!("{}-{}/db.db", self.db_name, generation), - CompressionKind::Gzip => format!("{}-{}/db.gz", self.db_name, generation), + let algos_to_try = match self.use_compression { + CompressionKind::None => &[ + CompressionKind::None, + CompressionKind::Xz, + CompressionKind::Gzip, + ], + CompressionKind::Gzip => &[ + CompressionKind::Gzip, + CompressionKind::Xz, + CompressionKind::None, + ], + CompressionKind::Xz => &[ + CompressionKind::Xz, + CompressionKind::Gzip, + CompressionKind::None, + ], }; - if let Ok(db_file) = self.get_object(main_db_path).send().await { - let mut body_reader = db_file.body.into_async_read(); - let db_size = match self.use_compression { - CompressionKind::None => tokio::io::copy(&mut body_reader, db).await?, - CompressionKind::Gzip => { - let mut decompress_reader = async_compression::tokio::bufread::GzipDecoder::new( - tokio::io::BufReader::new(body_reader), - ); - tokio::io::copy(&mut decompress_reader, db).await? - } + for algo in algos_to_try { + let main_db_path = match algo { + CompressionKind::None => format!("{}-{}/db.db", self.db_name, generation), + CompressionKind::Gzip => format!("{}-{}/db.gz", self.db_name, generation), + CompressionKind::Xz => format!("{}-{}/db.xz", self.db_name, generation), }; - db.flush().await?; + if let Ok(db_file) = self.get_object(main_db_path).send().await { + let mut body_reader = db_file.body.into_async_read(); + let db_size = match algo { + CompressionKind::None => tokio::io::copy(&mut body_reader, db).await?, + CompressionKind::Gzip => { + let mut decompress_reader = + async_compression::tokio::bufread::GzipDecoder::new( + tokio::io::BufReader::new(body_reader), + ); + tokio::io::copy(&mut decompress_reader, db).await? + } + CompressionKind::Xz => { + let mut decompress_reader = + async_compression::tokio::bufread::XzDecoder::new( + tokio::io::BufReader::new(body_reader), + ); + tokio::io::copy(&mut decompress_reader, db).await? + } + }; + db.flush().await?; - let page_size = Self::read_page_size(db).await?; - self.set_page_size(page_size)?; - tracing::info!("Restored the main database file ({} bytes)", db_size); - Ok(true) - } else { - Ok(false) + let page_size = Self::read_page_size(db).await?; + self.set_page_size(page_size)?; + tracing::info!("Restored the main database file ({} bytes)", db_size); + return Ok(true); + } } + Ok(false) } async fn restore_wal( @@ -1235,6 +1283,7 @@ impl Replicator { Some(result) => result, None => { if !key.ends_with(".gz") + && !key.ends_with(".xz") && !key.ends_with(".db") && !key.ends_with(".meta") && !key.ends_with(".dep") @@ -1423,6 +1472,7 @@ impl Replicator { let str = fpath.to_str()?; if str.ends_with(".db") | str.ends_with(".gz") + | str.ends_with(".xz") | str.ends_with(".raw") | str.ends_with(".meta") | str.ends_with(".dep") @@ -1670,6 +1720,7 @@ pub enum CompressionKind { #[default] None, Gzip, + Xz, } impl CompressionKind { @@ -1677,6 +1728,7 @@ impl CompressionKind { match kind { "gz" | "gzip" => Ok(CompressionKind::Gzip), "raw" | "" => Ok(CompressionKind::None), + "xz" => Ok(CompressionKind::Xz), other => Err(other), } } @@ -1687,6 +1739,7 @@ impl std::fmt::Display for CompressionKind { match self { CompressionKind::None => write!(f, "raw"), CompressionKind::Gzip => write!(f, "gz"), + CompressionKind::Xz => write!(f, "xz"), } } } From 5efb42b6fee2c01a233942179f351a14d8257582 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Wed, 18 Oct 2023 10:02:15 +0200 Subject: [PATCH 2/5] fixup: Decoder should be used instead of Encoder in read.rs --- bottomless/src/read.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bottomless/src/read.rs b/bottomless/src/read.rs index f1837c1926..07e511a005 100644 --- a/bottomless/src/read.rs +++ b/bottomless/src/read.rs @@ -1,7 +1,7 @@ use crate::replicator::CompressionKind; use crate::wal::WalFrameHeader; use anyhow::Result; -use async_compression::tokio::bufread::{GzipDecoder, XzEncoder}; +use async_compression::tokio::bufread::{GzipDecoder, XzDecoder}; use aws_sdk_s3::primitives::ByteStream; use std::io::ErrorKind; use std::pin::Pin; @@ -33,7 +33,7 @@ impl BatchReader { Box::pin(gzip) } CompressionKind::Xz => { - let xz = XzEncoder::new(reader); + let xz = XzDecoder::new(reader); Box::pin(xz) } }, From c2023d46f367f64dd586cdadd4ed46f12b6e92f6 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Wed, 18 Oct 2023 10:03:06 +0200 Subject: [PATCH 3/5] bottomless: update async_compression to 4.4 --- bottomless/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottomless/Cargo.toml b/bottomless/Cargo.toml index 24ee26cb14..a06b0f422b 100644 --- a/bottomless/Cargo.toml +++ b/bottomless/Cargo.toml @@ -10,7 +10,7 @@ description = "Bottomless replication for libSQL" [dependencies] anyhow = "1.0.66" -async-compression = { version = "0.3.15", features = ["tokio", "gzip", "xz"] } +async-compression = { version = "0.4.4", features = ["tokio", "gzip", "xz"] } aws-config = { version = "0.55" } aws-sdk-s3 = { version = "0.28" } bytes = "1" From 6f96daa9552110c4ddcad8808172af97181449fc Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Wed, 18 Oct 2023 11:08:51 +0200 Subject: [PATCH 4/5] bottomless: actually, add zstd During stress tests, xz turned out to spontaneously fail to compress, same with bzip2. All compression algos are supported by separate crates, so these were simply ruled out. Zstd proved to be: - fast - correct - more than acceptable on compression ratio --- bottomless/Cargo.toml | 2 +- bottomless/src/backup.rs | 8 +++---- bottomless/src/read.rs | 8 +++---- bottomless/src/replicator.rs | 42 ++++++++++++++++++------------------ 4 files changed, 30 insertions(+), 30 deletions(-) diff --git a/bottomless/Cargo.toml b/bottomless/Cargo.toml index a06b0f422b..e72ca093e1 100644 --- a/bottomless/Cargo.toml +++ b/bottomless/Cargo.toml @@ -10,7 +10,7 @@ description = "Bottomless replication for libSQL" [dependencies] anyhow = "1.0.66" -async-compression = { version = "0.4.4", features = ["tokio", "gzip", "xz"] } +async-compression = { version = "0.4.4", features = ["tokio", "gzip", "zstd"] } aws-config = { version = "0.55" } aws-sdk-s3 = { version = "0.28" } bytes = "1" diff --git a/bottomless/src/backup.rs b/bottomless/src/backup.rs index f3de86af10..50b16c78b1 100644 --- a/bottomless/src/backup.rs +++ b/bottomless/src/backup.rs @@ -116,10 +116,10 @@ impl WalCopier { wal.copy_frames(&mut gzip, len).await?; gzip.shutdown().await?; } - CompressionKind::Xz => { - let mut xz = async_compression::tokio::write::XzEncoder::new(&mut out); - wal.copy_frames(&mut xz, len).await?; - xz.shutdown().await?; + CompressionKind::Zstd => { + let mut zstd = async_compression::tokio::write::ZstdEncoder::new(&mut out); + wal.copy_frames(&mut zstd, len).await?; + zstd.shutdown().await?; } } if tracing::enabled!(tracing::Level::DEBUG) { diff --git a/bottomless/src/read.rs b/bottomless/src/read.rs index 07e511a005..5f53532cac 100644 --- a/bottomless/src/read.rs +++ b/bottomless/src/read.rs @@ -1,7 +1,7 @@ use crate::replicator::CompressionKind; use crate::wal::WalFrameHeader; use anyhow::Result; -use async_compression::tokio::bufread::{GzipDecoder, XzDecoder}; +use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder}; use aws_sdk_s3::primitives::ByteStream; use std::io::ErrorKind; use std::pin::Pin; @@ -32,9 +32,9 @@ impl BatchReader { let gzip = GzipDecoder::new(reader); Box::pin(gzip) } - CompressionKind::Xz => { - let xz = XzDecoder::new(reader); - Box::pin(xz) + CompressionKind::Zstd => { + let zstd = ZstdDecoder::new(reader); + Box::pin(zstd) } }, } diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 5ad70a2398..3994608d46 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -5,7 +5,7 @@ use crate::uuid_utils::decode_unix_timestamp; use crate::wal::WalFileReader; use anyhow::{anyhow, bail}; use arc_swap::ArcSwapOption; -use async_compression::tokio::write::{GzipEncoder, XzEncoder}; +use async_compression::tokio::write::{GzipEncoder, ZstdEncoder}; use aws_sdk_s3::config::{Credentials, Region}; use aws_sdk_s3::error::SdkError; use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder; @@ -671,25 +671,25 @@ impl Replicator { ); Ok(ByteStream::from_path(gzip_path).await?) } - CompressionKind::Xz => { + CompressionKind::Zstd => { let mut reader = File::open(db_path).await?; - let xz_path = Self::db_compressed_path(db_path, "xz"); + let zstd_path = Self::db_compressed_path(db_path, "zstd"); let compressed_file = OpenOptions::new() .create(true) .write(true) .read(true) .truncate(true) - .open(&xz_path) + .open(&zstd_path) .await?; - let mut writer = XzEncoder::new(compressed_file); + let mut writer = ZstdEncoder::new(compressed_file); let size = tokio::io::copy(&mut reader, &mut writer).await?; writer.shutdown().await?; tracing::debug!( "Compressed database file ({} bytes) into `{}`", size, - xz_path.display() + zstd_path.display() ); - Ok(ByteStream::from_path(xz_path).await?) + Ok(ByteStream::from_path(zstd_path).await?) } } } @@ -836,8 +836,8 @@ impl Replicator { let _ = snapshot_notifier.send(Ok(Some(generation))); let elapsed = Instant::now() - start; tracing::debug!("Snapshot upload finished (took {:?})", elapsed); - // cleanup gzip/xz database snapshot if exists - for suffix in &["gz", "xz"] { + // cleanup gzip/zstd database snapshot if exists + for suffix in &["gz", "zstd"] { let _ = tokio::fs::remove_file(Self::db_compressed_path(&db_path, suffix)).await; } }); @@ -1184,16 +1184,16 @@ impl Replicator { let algos_to_try = match self.use_compression { CompressionKind::None => &[ CompressionKind::None, - CompressionKind::Xz, + CompressionKind::Zstd, CompressionKind::Gzip, ], CompressionKind::Gzip => &[ CompressionKind::Gzip, - CompressionKind::Xz, + CompressionKind::Zstd, CompressionKind::None, ], - CompressionKind::Xz => &[ - CompressionKind::Xz, + CompressionKind::Zstd => &[ + CompressionKind::Zstd, CompressionKind::Gzip, CompressionKind::None, ], @@ -1203,7 +1203,7 @@ impl Replicator { let main_db_path = match algo { CompressionKind::None => format!("{}-{}/db.db", self.db_name, generation), CompressionKind::Gzip => format!("{}-{}/db.gz", self.db_name, generation), - CompressionKind::Xz => format!("{}-{}/db.xz", self.db_name, generation), + CompressionKind::Zstd => format!("{}-{}/db.zstd", self.db_name, generation), }; if let Ok(db_file) = self.get_object(main_db_path).send().await { let mut body_reader = db_file.body.into_async_read(); @@ -1216,9 +1216,9 @@ impl Replicator { ); tokio::io::copy(&mut decompress_reader, db).await? } - CompressionKind::Xz => { + CompressionKind::Zstd => { let mut decompress_reader = - async_compression::tokio::bufread::XzDecoder::new( + async_compression::tokio::bufread::ZstdDecoder::new( tokio::io::BufReader::new(body_reader), ); tokio::io::copy(&mut decompress_reader, db).await? @@ -1283,7 +1283,7 @@ impl Replicator { Some(result) => result, None => { if !key.ends_with(".gz") - && !key.ends_with(".xz") + && !key.ends_with(".zstd") && !key.ends_with(".db") && !key.ends_with(".meta") && !key.ends_with(".dep") @@ -1472,7 +1472,7 @@ impl Replicator { let str = fpath.to_str()?; if str.ends_with(".db") | str.ends_with(".gz") - | str.ends_with(".xz") + | str.ends_with(".zstd") | str.ends_with(".raw") | str.ends_with(".meta") | str.ends_with(".dep") @@ -1720,7 +1720,7 @@ pub enum CompressionKind { #[default] None, Gzip, - Xz, + Zstd, } impl CompressionKind { @@ -1728,7 +1728,7 @@ impl CompressionKind { match kind { "gz" | "gzip" => Ok(CompressionKind::Gzip), "raw" | "" => Ok(CompressionKind::None), - "xz" => Ok(CompressionKind::Xz), + "zstd" => Ok(CompressionKind::Zstd), other => Err(other), } } @@ -1739,7 +1739,7 @@ impl std::fmt::Display for CompressionKind { match self { CompressionKind::None => write!(f, "raw"), CompressionKind::Gzip => write!(f, "gz"), - CompressionKind::Xz => write!(f, "xz"), + CompressionKind::Zstd => write!(f, "zstd"), } } } From acda80301a5c5270a362e48e23c6ca1e09f58bb5 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Wed, 18 Oct 2023 11:17:11 +0200 Subject: [PATCH 5/5] bottomless: switch to zstd as default Gzip does not perform well on data in form of libSQL 4KiB pages, and zstd performed uniformly better in all test cases I covered locally (and not worse in case of random data with super high entropy). --- bottomless/src/replicator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 3994608d46..fe5bf847f6 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -179,7 +179,7 @@ impl Options { let restore_transaction_cache_fpath = env_var_or("LIBSQL_BOTTOMLESS_RESTORE_TXN_FILE", ".bottomless.restore"); let use_compression = - CompressionKind::parse(&env_var_or("LIBSQL_BOTTOMLESS_COMPRESSION", "gz")) + CompressionKind::parse(&env_var_or("LIBSQL_BOTTOMLESS_COMPRESSION", "zstd")) .map_err(|e| anyhow!("unknown compression kind: {}", e))?; let verify_crc = match env_var_or("LIBSQL_BOTTOMLESS_VERIFY_CRC", true) .to_lowercase()