diff --git a/Cargo.toml b/Cargo.toml index d8aa19b..8dc4047 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "orx-split-vec" -version = "3.20.0" +version = "3.21.0" edition = "2024" authors = ["orxfun "] description = "An efficient dynamic capacity vector with pinned element guarantees." @@ -12,7 +12,7 @@ categories = ["data-structures", "rust-patterns", "no-std"] [dependencies] orx-iterable = { version = "1.3.0", default-features = false } orx-pseudo-default = { version = "2.1.0", default-features = false } -orx-pinned-vec = { version = "3.18.0", default-features = false } +orx-pinned-vec = { version = "3.20.0", default-features = false } orx-concurrent-iter = { version = "3.1.0", default-features = false } [[bench]] @@ -24,4 +24,4 @@ criterion = "0.7.0" rand = { version = "0.9.2", default-features = false } rand_chacha = { version = "0.9", default-features = false } test-case = "3.3.1" -orx-concurrent-bag = "3.1.0" +# orx-concurrent-bag = "3.1.0" diff --git a/src/concurrent_iter/tests/con_iter_owned/mod.rs b/src/concurrent_iter/tests/con_iter_owned/mod.rs index bc15087..cd80447 100644 --- a/src/concurrent_iter/tests/con_iter_owned/mod.rs +++ b/src/concurrent_iter/tests/con_iter_owned/mod.rs @@ -1,4 +1,4 @@ -mod con_iter; -mod into; +// mod con_iter; +// mod into; mod par; mod transformations; diff --git a/src/concurrent_iter/tests/con_iter_ref/mod.rs b/src/concurrent_iter/tests/con_iter_ref/mod.rs index bc15087..cd80447 100644 --- a/src/concurrent_iter/tests/con_iter_ref/mod.rs +++ b/src/concurrent_iter/tests/con_iter_ref/mod.rs @@ -1,4 +1,4 @@ -mod con_iter; -mod into; +// mod con_iter; +// mod into; mod par; mod transformations; diff --git a/src/concurrent_pinned_vec/con_pinvec.rs b/src/concurrent_pinned_vec/con_pinvec.rs index 4e31272..c6a3947 100644 --- a/src/concurrent_pinned_vec/con_pinvec.rs +++ b/src/concurrent_pinned_vec/con_pinvec.rs @@ -1,7 +1,7 @@ use crate::{ Doubling, Fragment, GrowthWithConstantTimeAccess, SplitVec, common_traits::iterator::{IterOfSlicesOfCon, SliceBorrowAsMut, SliceBorrowAsRef}, - concurrent_pinned_vec::iter_ptr::IterPtrOfCon, + concurrent_pinned_vec::{into_iter::ConcurrentSplitVecIntoIter, iter_ptr::IterPtrOfCon}, fragment::transformations::{fragment_from_raw, fragment_into_raw}, }; use alloc::vec::Vec; @@ -10,10 +10,10 @@ use core::sync::atomic::{AtomicUsize, Ordering}; use core::{cell::UnsafeCell, ops::Range}; use orx_pinned_vec::ConcurrentPinnedVec; -struct FragmentData { - f: usize, - len: usize, - capacity: usize, +pub struct FragmentData { + pub f: usize, + pub len: usize, + pub capacity: usize, } /// Concurrent wrapper ([`orx_pinned_vec::ConcurrentPinnedVec`]) for the `SplitVec`. @@ -35,6 +35,15 @@ impl Drop for ConcurrentSplitVec { } impl ConcurrentSplitVec { + pub(super) fn destruct(mut self) -> (G, Vec>, usize) { + let mut data = Vec::new(); + core::mem::swap(&mut self.data, &mut data); + let capacity = self.capacity.load(Ordering::Relaxed); + let growth = self.growth.clone(); + self.zero(); + (growth, data, capacity) + } + unsafe fn get_raw_mut_unchecked_fi(&self, f: usize, i: usize) -> *mut T { let p = unsafe { *self.data[f].get() }; unsafe { p.add(i) } @@ -195,6 +204,8 @@ impl ConcurrentPinnedVec for ConcurrentSp where Self: 'a; + type IntoIter = ConcurrentSplitVecIntoIter; + unsafe fn into_inner(mut self, len: usize) -> Self::P { let mut fragments = Vec::with_capacity(self.max_num_fragments); let mut take_fragment = |fragment| fragments.push(fragment); @@ -434,4 +445,9 @@ impl ConcurrentPinnedVec for ConcurrentSp unsafe fn ptr_iter_unchecked(&self, range: Range) -> Self::PtrIter<'_> { IterPtrOfCon::new(self.capacity(), &self.data, self.growth.clone(), range) } + + unsafe fn into_iter(self, range: Range) -> Self::IntoIter { + let (growth, data, capacity) = self.destruct(); + ConcurrentSplitVecIntoIter::new(capacity, data, growth, range) + } } diff --git a/src/concurrent_pinned_vec/into_iter.rs b/src/concurrent_pinned_vec/into_iter.rs new file mode 100644 index 0000000..ea3d300 --- /dev/null +++ b/src/concurrent_pinned_vec/into_iter.rs @@ -0,0 +1,117 @@ +use crate::{ + GrowthWithConstantTimeAccess, + concurrent_pinned_vec::into_iter_ptr_slices::IntoIterPtrOfConSlices, +}; +use alloc::vec::Vec; +use core::{cell::UnsafeCell, ops::Range}; + +pub struct ConcurrentSplitVecIntoIter +where + G: GrowthWithConstantTimeAccess, +{ + slices: IntoIterPtrOfConSlices, + len_of_remaining_slices: usize, + current_ptr: *const T, + current_last: *const T, +} + +impl ConcurrentSplitVecIntoIter +where + G: GrowthWithConstantTimeAccess, +{ + pub fn new( + capacity: usize, + fragments: Vec>, + growth: G, + range: Range, + ) -> Self { + let len_of_remaining_slices = range.len(); + let slices = IntoIterPtrOfConSlices::new(capacity, fragments, growth, range); + Self { + slices, + len_of_remaining_slices, + current_ptr: core::ptr::null(), + current_last: core::ptr::null(), + } + } + + fn remaining(&self) -> usize { + let remaining_current = match self.current_ptr.is_null() { + true => 0, + // SAFETY: whenever current_ptr is not null, we know that current_last is also not + // null which is >= current_ptr. + false => unsafe { self.current_last.offset_from(self.current_ptr) as usize + 1 }, + }; + + self.len_of_remaining_slices + remaining_current + } + + fn next_ptr(&mut self) -> Option<*mut T> { + match self.current_ptr { + ptr if ptr.is_null() => self.next_slice(), + ptr if ptr == self.current_last => { + self.current_ptr = core::ptr::null_mut(); + Some(ptr as *mut T) + } + ptr => { + // SAFETY: current_ptr is not the last element, hance current_ptr+1 is in bounds + self.current_ptr = unsafe { self.current_ptr.add(1) }; + + // SAFETY: ptr is valid and its value can be taken. + // Drop will skip this position which is now uninitialized. + Some(ptr as *mut T) + } + } + } + + fn next_slice(&mut self) -> Option<*mut T> { + self.slices.next().and_then(|(ptr, len)| { + debug_assert!(len > 0); + self.len_of_remaining_slices -= len; + // SAFETY: pointers are not null since slice is not empty + self.current_ptr = ptr; + self.current_last = unsafe { ptr.add(len - 1) }; + self.next_ptr() + }) + } +} + +impl Iterator for ConcurrentSplitVecIntoIter +where + G: GrowthWithConstantTimeAccess, +{ + type Item = T; + + #[inline(always)] + fn next(&mut self) -> Option { + self.next_ptr().map(|ptr| unsafe { ptr.read() }) + } + + fn size_hint(&self) -> (usize, Option) { + let len = self.remaining(); + (len, Some(len)) + } +} + +impl ExactSizeIterator for ConcurrentSplitVecIntoIter +where + G: GrowthWithConstantTimeAccess, +{ + fn len(&self) -> usize { + self.remaining() + } +} + +impl Drop for ConcurrentSplitVecIntoIter +where + G: GrowthWithConstantTimeAccess, +{ + fn drop(&mut self) { + if core::mem::needs_drop::() { + while let Some(ptr) = self.next_ptr() { + // SAFETY: ptr is in bounds and have not been dropped yet + unsafe { ptr.drop_in_place() }; + } + } + } +} diff --git a/src/concurrent_pinned_vec/into_iter_ptr_slices.rs b/src/concurrent_pinned_vec/into_iter_ptr_slices.rs new file mode 100644 index 0000000..24c5c97 --- /dev/null +++ b/src/concurrent_pinned_vec/into_iter_ptr_slices.rs @@ -0,0 +1,185 @@ +use crate::{ + GrowthWithConstantTimeAccess, + fragment::transformations::fragment_from_raw, + range_helpers::{range_end, range_start}, +}; +use alloc::vec::Vec; +use core::cmp::min; +use core::{cell::UnsafeCell, iter::FusedIterator, ops::Range}; + +#[derive(Default)] +pub(super) struct IntoIterPtrOfConSlices +where + G: GrowthWithConstantTimeAccess, +{ + fragments: Vec>, + growth: G, + sf: usize, + si: usize, + si_end: usize, + ef: usize, + ei: usize, + f: usize, +} + +impl Drop for IntoIterPtrOfConSlices +where + G: GrowthWithConstantTimeAccess, +{ + fn drop(&mut self) { + Self::drop_fragments(&self.growth, &mut self.fragments); + } +} + +impl IntoIterPtrOfConSlices +where + G: GrowthWithConstantTimeAccess, +{ + fn empty() -> Self { + Self { + fragments: Default::default(), + growth: G::pseudo_default(), + sf: 0, + si: 0, + si_end: 0, + ef: 0, + ei: 0, + f: 1, + } + } + + fn single_slice( + fragments: Vec>, + growth: G, + f: usize, + begin: usize, + end: usize, + ) -> Self { + Self { + fragments, + growth, + sf: f, + si: begin, + si_end: end, + ef: f, + ei: 0, + f, + } + } + + pub fn new( + capacity: usize, + mut fragments: Vec>, + growth: G, + range: Range, + ) -> Self { + let fragment_and_inner_indices = |i| growth.get_fragment_and_inner_indices_unchecked(i); + + let a = range_start(&range); + let b = min(capacity, range_end(&range, capacity)); + + match b.saturating_sub(a) { + 0 => { + Self::drop_fragments(&growth, &mut fragments); + Self::empty() + } + _ => { + let (sf, si) = fragment_and_inner_indices(a); + let (ef, ei) = fragment_and_inner_indices(b - 1); + + match sf == ef { + true => Self::single_slice(fragments, growth, sf, si, ei + 1), + false => { + let si_end = growth.fragment_capacity_of(sf); + Self { + fragments, + growth, + sf, + si, + si_end, + ef, + ei, + f: sf, + } + } + } + } + } + } + + fn drop_fragments(growth: &G, fragments: &mut [UnsafeCell<*mut T>]) { + for (f, cell) in fragments.iter().enumerate() { + let ptr = unsafe { *cell.get() }; + match ptr.is_null() { + true => continue, + false => { + let capacity = growth.fragment_capacity_of(f); + let _fragment_to_drop = unsafe { fragment_from_raw(ptr, 0, capacity) }; + } + } + } + } + + #[inline(always)] + fn remaining_len(&self) -> usize { + (1 + self.ef).saturating_sub(self.f) + } + + #[inline(always)] + fn get_ptr_fi(&self, f: usize, i: usize) -> *mut T { + let p = unsafe { *self.fragments[f].get() }; + unsafe { p.add(i) } + } + + #[inline(always)] + fn capacity_of(&self, f: usize) -> usize { + self.growth.fragment_capacity_of(f) + } +} + +impl Iterator for IntoIterPtrOfConSlices +where + G: GrowthWithConstantTimeAccess, +{ + type Item = (*mut T, usize); + + fn next(&mut self) -> Option { + match self.f { + f if f == self.sf => { + self.f += 1; + let len = self.si_end - self.si; + let p = self.get_ptr_fi(self.sf, self.si); + Some((p, len)) + } + f if f < self.ef => { + self.f += 1; + let len = self.capacity_of(f); + let p = self.get_ptr_fi(f, 0); + Some((p, len)) + } + f if f == self.ef => { + self.f += 1; + let len = self.ei + 1; + let p = self.get_ptr_fi(self.ef, 0); + Some((p, len)) + } + _ => None, + } + } + + fn size_hint(&self) -> (usize, Option) { + let len = self.remaining_len(); + (len, Some(len)) + } +} + +impl FusedIterator for IntoIterPtrOfConSlices where G: GrowthWithConstantTimeAccess {} + +impl ExactSizeIterator for IntoIterPtrOfConSlices +where + G: GrowthWithConstantTimeAccess, +{ + fn len(&self) -> usize { + self.remaining_len() + } +} diff --git a/src/concurrent_pinned_vec/iter_ptr.rs b/src/concurrent_pinned_vec/iter_ptr.rs index eb84fb3..4280741 100644 --- a/src/concurrent_pinned_vec/iter_ptr.rs +++ b/src/concurrent_pinned_vec/iter_ptr.rs @@ -13,6 +13,20 @@ where current_last: *const T, } +impl<'a, T, G> Default for IterPtrOfCon<'a, T, G> +where + G: GrowthWithConstantTimeAccess, +{ + fn default() -> Self { + Self { + slices: IterPtrOfConSlices::default(), + len_of_remaining_slices: 0, + current_ptr: core::ptr::null(), + current_last: core::ptr::null(), + } + } +} + impl<'a, T, G> IterPtrOfCon<'a, T, G> where G: GrowthWithConstantTimeAccess, diff --git a/src/concurrent_pinned_vec/iter_ptr_slices.rs b/src/concurrent_pinned_vec/iter_ptr_slices.rs index f956bc4..4c600f6 100644 --- a/src/concurrent_pinned_vec/iter_ptr_slices.rs +++ b/src/concurrent_pinned_vec/iter_ptr_slices.rs @@ -19,6 +19,15 @@ where f: usize, } +impl<'a, T, G> Default for IterPtrOfConSlices<'a, T, G> +where + G: GrowthWithConstantTimeAccess, +{ + fn default() -> Self { + Self::empty() + } +} + impl<'a, T, G> IterPtrOfConSlices<'a, T, G> where G: GrowthWithConstantTimeAccess, diff --git a/src/concurrent_pinned_vec/mod.rs b/src/concurrent_pinned_vec/mod.rs index 134f191..da01639 100644 --- a/src/concurrent_pinned_vec/mod.rs +++ b/src/concurrent_pinned_vec/mod.rs @@ -1,4 +1,9 @@ +#[cfg(test)] +mod tests; + mod con_pinvec; +mod into_iter; +mod into_iter_ptr_slices; mod iter_ptr; mod iter_ptr_slices; diff --git a/src/concurrent_pinned_vec/tests/into_iter.rs b/src/concurrent_pinned_vec/tests/into_iter.rs new file mode 100644 index 0000000..4f7d1f7 --- /dev/null +++ b/src/concurrent_pinned_vec/tests/into_iter.rs @@ -0,0 +1,162 @@ +use crate::{ + Doubling, GrowthWithConstantTimeAccess, Linear, SplitVec, + concurrent_pinned_vec::into_iter::ConcurrentSplitVecIntoIter, +}; +use orx_pinned_vec::{ConcurrentPinnedVec, IntoConcurrentPinnedVec, PinnedVec}; +use std::string::{String, ToString}; +use test_case::test_matrix; + +fn vec_doubling(n: usize) -> SplitVec { + (0..n).map(|x| x.to_string()).collect() +} + +fn vec_linear(n: usize) -> SplitVec { + let mut vec = SplitVec::with_linear_growth(2); + vec.extend((0..n).map(|x| x.to_string())); + vec +} + +#[test_matrix([vec_doubling, vec_linear])] +fn into_iter_empty(vec: F) +where + G: GrowthWithConstantTimeAccess, + F: Fn(usize) -> SplitVec, +{ + let iter = || { + let vec = vec(0); + let range = 0..vec.len(); + let convec = vec.into_concurrent(); + let (growth, data, capacity) = convec.destruct(); + ConcurrentSplitVecIntoIter::new(capacity, data, growth, range) + }; + + let consume_all = iter().count(); + assert_eq!(consume_all, 0); + + let mut consume_half = iter(); + for _ in 0..10 { + _ = consume_half.next(); + } + + let _consume_none = iter(); +} + +#[test_matrix([vec_doubling, vec_linear])] +fn into_iter_non_taken(vec: F) +where + G: GrowthWithConstantTimeAccess, + F: Fn(usize) -> SplitVec, +{ + let iter = || { + let vec = vec(20); + let range = 0..vec.len(); + let convec = vec.into_concurrent(); + let (growth, data, capacity) = convec.destruct(); + ConcurrentSplitVecIntoIter::new(capacity, data, growth, range) + }; + + let consume_all = iter().count(); + assert_eq!(consume_all, 20); + + let mut consume_half = iter(); + for _ in 0..10 { + _ = consume_half.next(); + } + + let _consume_none = iter(); +} + +#[test_matrix([vec_doubling, vec_linear])] +fn into_iter_taken_from_beg(vec: F) +where + G: GrowthWithConstantTimeAccess, + F: Fn(usize) -> SplitVec, +{ + let iter = || { + let vec = vec(20); + let range = 5..vec.len(); + let convec = vec.into_concurrent(); + + for i in 0..range.start { + let p = unsafe { convec.get_ptr_mut(i) }; + let _value = unsafe { p.read() }; + } + + let (growth, data, capacity) = convec.destruct(); + ConcurrentSplitVecIntoIter::new(capacity, data, growth, range) + }; + + let consume_all = iter().count(); + assert_eq!(consume_all, 15); + + let mut consume_half = iter(); + for _ in 0..10 { + _ = consume_half.next(); + } + + let _consume_none = iter(); +} + +#[test_matrix([vec_doubling, vec_linear])] +fn into_iter_taken_from_end(vec: F) +where + G: GrowthWithConstantTimeAccess, + F: Fn(usize) -> SplitVec, +{ + let iter = || { + let vec = vec(20); + let vec_len = vec.len(); + let range = 0..15; + let convec = vec.into_concurrent(); + + for i in range.end..vec_len { + let p = unsafe { convec.get_ptr_mut(i) }; + let _value = unsafe { p.read() }; + } + + let (growth, data, capacity) = convec.destruct(); + ConcurrentSplitVecIntoIter::new(capacity, data, growth, range) + }; + + let consume_all = iter().count(); + assert_eq!(consume_all, 15); + + let mut consume_half = iter(); + for _ in 0..10 { + _ = consume_half.next(); + } + + let _consume_none = iter(); +} + +#[test_matrix([vec_doubling, vec_linear])] +fn into_iter_taken_from_both_ends(vec: F) +where + G: GrowthWithConstantTimeAccess, + F: Fn(usize) -> SplitVec, +{ + let iter = || { + let vec = vec(20); + let vec_len = vec.len(); + let range = 4..15; + let convec = vec.into_concurrent(); + + for i in (0..range.start).chain(range.end..vec_len) { + let p = unsafe { convec.get_ptr_mut(i) }; + let _value = unsafe { p.read() }; + } + + let (growth, data, capacity) = convec.destruct(); + ConcurrentSplitVecIntoIter::new(capacity, data, growth, range) + }; + + let consume_all = iter().count(); + assert_eq!(consume_all, 11); + + let mut consume_half = iter(); + for _ in 0..7 { + _ = consume_half.next(); + } + + let _consume_none = iter(); +} diff --git a/src/concurrent_pinned_vec/tests/iter_ptr.rs b/src/concurrent_pinned_vec/tests/iter_ptr.rs new file mode 100644 index 0000000..66d8a73 --- /dev/null +++ b/src/concurrent_pinned_vec/tests/iter_ptr.rs @@ -0,0 +1,17 @@ +use crate::GrowthWithConstantTimeAccess; +use crate::concurrent_pinned_vec::iter_ptr::IterPtrOfCon; +use crate::{Doubling, Linear}; +use std::string::String; +use test_case::test_matrix; + +#[test_matrix([ + Doubling, + Linear::new(4), +])] +fn iter_ptr_default(_: G) +where + G: GrowthWithConstantTimeAccess, +{ + let iter = IterPtrOfCon::::default(); + for _ in iter {} +} diff --git a/src/concurrent_pinned_vec/tests/mod.rs b/src/concurrent_pinned_vec/tests/mod.rs new file mode 100644 index 0000000..c6ea9db --- /dev/null +++ b/src/concurrent_pinned_vec/tests/mod.rs @@ -0,0 +1,2 @@ +mod into_iter; +mod iter_ptr;