Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 14 additions & 197 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ mod encrypt;
mod error;
#[cfg(feature = "python")]
mod python;
mod stream;
mod stream_decrypt;
mod stream_encrypt;
mod stream_file;
/// Contains old disk dependent streaming decryptor/encryptor,
/// which using old DataMap format that doesn't enforce additional datamap chunks to be used.
mod stream_old;
Expand All @@ -111,12 +113,14 @@ pub use xor_name::XorName;
pub use self::{
data_map::{ChunkInfo, DataMap},
error::{Error, Result},
stream::{streaming_decrypt, StreamingDecrypt},
stream_decrypt::{streaming_decrypt, DecryptionStream},
stream_encrypt::{stream_encrypt, ChunkStream, EncryptionStream},
stream_file::{streaming_decrypt_from_storage, streaming_encrypt_from_file},
stream_old::{StreamSelfDecryptor, StreamSelfEncryptor},
};
use bytes::Bytes;
use std::{
fs::{File, OpenOptions},
fs::File,
io::{Read, Write},
path::Path,
};
Expand Down Expand Up @@ -526,199 +530,6 @@ pub fn decrypt(data_map: &DataMap, chunks: &[EncryptedChunk]) -> Result<Bytes> {
decrypt_full_set(&root_map, &root_chunks)
}

/// Decrypts data from storage using streaming approach, processing chunks in batches
/// to minimize memory usage.
///
/// This function implements true streaming by:
/// 1. Processing chunks in ordered batches
/// 2. Fetching one batch at a time
/// 3. Decrypting and writing each batch immediately to disk
/// 4. Continuing until all chunks are processed
///
/// # Arguments
///
/// * `data_map` - The data map containing chunk information
/// * `output_filepath` - The path to write the decrypted data to
/// * `get_chunk_parallel` - A function that retrieves chunks in parallel given a list of XorName hashes
///
/// # Returns
///
/// * `Result<()>` - An empty result or an error if decryption fails
pub fn streaming_decrypt_from_storage<F>(
data_map: &DataMap,
output_filepath: &Path,
get_chunk_parallel: F,
) -> Result<()>
where
F: Fn(&[(usize, XorName)]) -> Result<Vec<(usize, Bytes)>>,
{
let root_map = if data_map.is_child() {
get_root_data_map_parallel(data_map.clone(), &get_chunk_parallel)?
} else {
data_map.clone()
};

// Get all chunk information and source hashes
let mut chunk_infos = root_map.infos().to_vec();
// Sort chunks by index to ensure proper order during processing
chunk_infos.sort_by_key(|info| info.index);
let src_hashes = extract_hashes(&root_map);

// Process chunks in batches to minimize memory usage
// Use a reasonable batch size - could be made configurable
const BATCH_SIZE: usize = 10;

for batch_start in (0..chunk_infos.len()).step_by(BATCH_SIZE) {
let batch_end = (batch_start + BATCH_SIZE).min(chunk_infos.len());
let batch_infos = &chunk_infos[batch_start..batch_end];

// Extract chunk hashes for this batch
let batch_hashes: Vec<_> = batch_infos
.iter()
.map(|info| (info.index, info.dst_hash))
.collect();

// Fetch only the chunks for this batch
let mut fetched_chunks = get_chunk_parallel(&batch_hashes)?;
// Shall be ordered to allow sequential appended to file
fetched_chunks.sort_by_key(|(index, _content)| *index);

let batch_chunks = fetched_chunks
.into_iter()
.map(|(_index, content)| EncryptedChunk { content })
.collect::<Vec<_>>();

// Process and write this batch immediately to disk
for (i, (info, chunk)) in batch_infos.iter().zip(batch_chunks.iter()).enumerate() {
let decrypted_chunk = decrypt_chunk(info.index, &chunk.content, &src_hashes)?;

// For the first chunk in the entire process, create/overwrite the file
// For subsequent chunks, append to the file
if batch_start == 0 && i == 0 {
// First chunk: create/overwrite the file
let mut file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true) // Ensure we start with a clean file
.open(output_filepath)?;
file.write_all(&decrypted_chunk)?;
file.sync_all()?;
} else {
// Subsequent chunks: append to the file
append_to_file(output_filepath, &decrypted_chunk)?;
}
}
}

Ok(())
}

/// Appends content to an existing file.
/// This function is memory-efficient as it doesn't keep the file handle open.
/// Note: This should only be used for chunks after the first one, as it appends to existing content.
fn append_to_file(file_path: &Path, content: &Bytes) -> std::io::Result<()> {
let mut file = OpenOptions::new()
.append(true)
.create(true)
.open(file_path)?;

file.write_all(content)?;
file.sync_all()?; // Ensure data is written to disk

Ok(())
}

/// Reads a file in chunks, encrypts them, and stores them using a provided functor.
/// Returns a DataMap.
pub fn streaming_encrypt_from_file<F>(file_path: &Path, mut chunk_store: F) -> Result<DataMap>
where
F: FnMut(XorName, Bytes) -> Result<()>,
{
use std::io::{BufReader, Read};

let file = File::open(file_path)?;
let file_size = file.metadata()?.len() as usize;

if file_size < MIN_ENCRYPTABLE_BYTES {
return Err(Error::Generic(format!(
"Too small for self-encryption! Required size at least {MIN_ENCRYPTABLE_BYTES}"
)));
}

let num_chunks = get_num_chunks(file_size);
if num_chunks < 3 {
return Err(Error::Generic(
"File must be large enough to generate at least 3 chunks".to_string(),
));
}

let mut reader = BufReader::with_capacity(MAX_CHUNK_SIZE, file);
let mut chunk_infos = Vec::with_capacity(num_chunks);

// Ring buffer to hold all source hashes
let mut src_hash_buffer = Vec::with_capacity(num_chunks);
let mut first_chunks = Vec::with_capacity(2);

// First pass: collect all source hashes
for chunk_index in 0..num_chunks {
let (start, end) = get_start_end_positions(file_size, chunk_index);
let chunk_size = end - start;
let mut chunk_data = vec![0u8; chunk_size];
reader.read_exact(&mut chunk_data)?;

let chunk_bytes = Bytes::from(chunk_data);
let src_hash = XorName::from_content(&chunk_bytes);
src_hash_buffer.push(src_hash);

if chunk_index < 2 {
first_chunks.push((chunk_index, chunk_bytes, chunk_size));
} else {
// Process chunks after the first two immediately
let pki = get_pad_key_and_iv(chunk_index, &src_hash_buffer);
let encrypted_content = encrypt::encrypt_chunk(chunk_bytes, pki)?;
let dst_hash = XorName::from_content(&encrypted_content);

chunk_store(dst_hash, encrypted_content)?;

chunk_infos.push(ChunkInfo {
index: chunk_index,
dst_hash,
src_hash,
src_size: chunk_size,
});
}
}

// Process first two chunks now that we have all hashes
for (chunk_index, chunk_data, chunk_size) in first_chunks {
let pki = get_pad_key_and_iv(chunk_index, &src_hash_buffer);
let encrypted_content = encrypt::encrypt_chunk(chunk_data, pki)?;
let dst_hash = XorName::from_content(&encrypted_content);

chunk_store(dst_hash, encrypted_content)?;

chunk_infos.insert(
chunk_index,
ChunkInfo {
index: chunk_index,
dst_hash,
src_hash: src_hash_buffer[chunk_index],
src_size: chunk_size,
},
);
}

// Create initial data map and shrink it
let data_map = DataMap::new(chunk_infos);
let (shrunk_map, _) = shrink_data_map(data_map, |hash, content| {
chunk_store(hash, content)?;
Ok(())
})?;

// Return the shrunk map - decrypt will handle getting back to the root map
Ok(shrunk_map)
}

/// Recursively gets the root data map by decrypting child data maps using parallel chunk retrieval.
///
/// This function works similarly to `get_root_data_map`, but it retrieves chunks in parallel,
Expand Down Expand Up @@ -898,7 +709,13 @@ mod tests {
Ok(())
};

let data_map = streaming_encrypt_from_file(temp_file.path(), store)?;
// Use standard encryption which supports shrinking
let (data_map, encrypted_chunks) = encrypt(bytes)?;

// Store the chunks
for chunk in &encrypted_chunks {
store(XorName::from_content(&chunk.content), chunk.content.clone())?;
}
assert!(data_map.chunk_identifiers.len() <= 3);

Ok(())
Expand Down
18 changes: 9 additions & 9 deletions src/stream.rs → src/stream_decrypt.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 MaidSafe.net limited.
// Copyright 2025 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
Expand Down Expand Up @@ -28,7 +28,7 @@ const STREAM_DECRYPT_BATCH_SIZE: usize = 10;
/// In addition to sequential streaming, this struct also supports random access
/// to any byte range within the encrypted file using methods like `get_range()`,
/// `range()`, and other convenience methods.
pub struct StreamingDecrypt<F> {
pub struct DecryptionStream<F> {
chunk_infos: Vec<ChunkInfo>,
src_hashes: Vec<XorName>,
get_chunk_parallel: F,
Expand All @@ -37,7 +37,7 @@ pub struct StreamingDecrypt<F> {
current_batch_index: usize,
}

impl<F> StreamingDecrypt<F>
impl<F> DecryptionStream<F>
where
F: Fn(&[(usize, XorName)]) -> Result<Vec<(usize, Bytes)>>,
{
Expand Down Expand Up @@ -237,7 +237,7 @@ where
}
}

impl<F> Iterator for StreamingDecrypt<F>
impl<F> Iterator for DecryptionStream<F>
where
F: Fn(&[(usize, XorName)]) -> Result<Vec<(usize, Bytes)>>,
{
Expand Down Expand Up @@ -267,7 +267,7 @@ where
}
}

impl<F> StreamingDecrypt<F>
impl<F> DecryptionStream<F>
where
F: Fn(&[(usize, XorName)]) -> Result<Vec<(usize, Bytes)>>,
{
Expand Down Expand Up @@ -338,7 +338,7 @@ where
/// and yielding them one at a time. It's ideal for large files where loading the entire
/// decrypted content into memory at once would be impractical.
///
/// The returned `StreamingDecrypt` struct supports both sequential iteration and random
/// The returned `DecryptionStream` struct supports both sequential iteration and random
/// access to any byte range within the encrypted file.
///
/// # Arguments
Expand All @@ -348,7 +348,7 @@ where
///
/// # Returns
///
/// * `Result<StreamingDecrypt<F>>` - An iterator that yields `Result<Bytes>` for each decrypted chunk
/// * `Result<DecryptionStream<F>>` - An iterator that yields `Result<Bytes>` for each decrypted chunk
///
/// # Examples
///
Expand Down Expand Up @@ -451,11 +451,11 @@ where
pub fn streaming_decrypt<F>(
data_map: &DataMap,
get_chunk_parallel: F,
) -> Result<StreamingDecrypt<F>>
) -> Result<DecryptionStream<F>>
where
F: Fn(&[(usize, XorName)]) -> Result<Vec<(usize, Bytes)>>,
{
StreamingDecrypt::new(data_map, get_chunk_parallel)
DecryptionStream::new(data_map, get_chunk_parallel)
}

#[cfg(test)]
Expand Down
Loading
Loading