Skip to content
35 changes: 18 additions & 17 deletions datafusion/physical-plan/benches/aggregate_vectorized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use arrow::array::ArrayRef;
use arrow::buffer::BooleanBuffer;
use arrow::datatypes::{Int32Type, StringViewType};
use arrow::util::bench_util::{
create_primitive_array, create_string_view_array_with_len,
Expand All @@ -29,7 +30,7 @@ use criterion::{
};
use datafusion_physical_plan::aggregates::group_values::multi_group_by::bytes_view::ByteViewGroupValueBuilder;
use datafusion_physical_plan::aggregates::group_values::multi_group_by::primitive::PrimitiveGroupValueBuilder;
use datafusion_physical_plan::aggregates::group_values::multi_group_by::GroupColumn;
use datafusion_physical_plan::aggregates::group_values::multi_group_by::{FixedBitPackedMutableBuffer, GroupColumn};
use rand::distr::{Bernoulli, Distribution};
use std::hint::black_box;
use std::sync::Arc;
Expand Down Expand Up @@ -118,7 +119,7 @@ fn bytes_bench(
rows,
input,
"all_true",
vec![true; size],
&BooleanBuffer::new_set(size),
);
vectorized_equal_to(
group,
Expand All @@ -127,10 +128,10 @@ fn bytes_bench(
rows,
input,
"0.75 true",
{
&{
let mut rng = seedable_rng();
let d = Bernoulli::new(0.75).unwrap();
(0..size).map(|_| d.sample(&mut rng)).collect::<Vec<_>>()
BooleanBuffer::from_iter((0..size).map(|_| d.sample(&mut rng)))
},
);
vectorized_equal_to(
Expand All @@ -140,10 +141,10 @@ fn bytes_bench(
rows,
input,
"0.5 true",
{
&{
let mut rng = seedable_rng();
let d = Bernoulli::new(0.5).unwrap();
(0..size).map(|_| d.sample(&mut rng)).collect::<Vec<_>>()
BooleanBuffer::from_iter((0..size).map(|_| d.sample(&mut rng)))
},
);
vectorized_equal_to(
Expand All @@ -153,10 +154,10 @@ fn bytes_bench(
rows,
input,
"0.25 true",
{
&{
let mut rng = seedable_rng();
let d = Bernoulli::new(0.25).unwrap();
(0..size).map(|_| d.sample(&mut rng)).collect::<Vec<_>>()
BooleanBuffer::from_iter((0..size).map(|_| d.sample(&mut rng)))
},
);
// Not adding 0 true case here as if we optimize for 0 true cases the caller should avoid calling this method at all
Expand Down Expand Up @@ -226,7 +227,7 @@ fn bench_single_primitive<const NULLABLE: bool>(
rows,
&input,
"all_true",
vec![true; size],
&BooleanBuffer::new_set(size),
);
vectorized_equal_to(
group,
Expand All @@ -235,10 +236,10 @@ fn bench_single_primitive<const NULLABLE: bool>(
rows,
&input,
"0.75 true",
{
&{
let mut rng = seedable_rng();
let d = Bernoulli::new(0.75).unwrap();
(0..size).map(|_| d.sample(&mut rng)).collect::<Vec<_>>()
BooleanBuffer::from_iter((0..size).map(|_| d.sample(&mut rng)))
},
);
vectorized_equal_to(
Expand All @@ -248,10 +249,10 @@ fn bench_single_primitive<const NULLABLE: bool>(
rows,
&input,
"0.5 true",
{
&{
let mut rng = seedable_rng();
let d = Bernoulli::new(0.5).unwrap();
(0..size).map(|_| d.sample(&mut rng)).collect::<Vec<_>>()
BooleanBuffer::from_iter((0..size).map(|_| d.sample(&mut rng)))
},
);
vectorized_equal_to(
Expand All @@ -261,10 +262,10 @@ fn bench_single_primitive<const NULLABLE: bool>(
rows,
&input,
"0.25 true",
{
&{
let mut rng = seedable_rng();
let d = Bernoulli::new(0.25).unwrap();
(0..size).map(|_| d.sample(&mut rng)).collect::<Vec<_>>()
BooleanBuffer::from_iter((0..size).map(|_| d.sample(&mut rng)))
},
);
// Not adding 0 true case here as if we optimize for 0 true cases the caller should avoid calling this method at all
Expand All @@ -279,7 +280,7 @@ fn vectorized_equal_to<GroupColumnBuilder: GroupColumn>(
rows: &[usize],
input: &ArrayRef,
equal_to_result_description: &str,
equal_to_results: Vec<bool>,
equal_to_results: &BooleanBuffer,
) {
let id = BenchmarkId::new(
function_name,
Expand All @@ -291,7 +292,7 @@ fn vectorized_equal_to<GroupColumnBuilder: GroupColumn>(
b.iter(|| {
// Cloning is a must as `vectorized_equal_to` will modify the input vec
// and without cloning all benchmarks after the first one won't be meaningful
let mut equal_to_results = equal_to_results.clone();
let mut equal_to_results = FixedBitPackedMutableBuffer::from(equal_to_results);
builder.vectorized_equal_to(rows, input, rows, &mut equal_to_results);

// Make sure that the compiler does not optimize away the call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::sync::Arc;

use crate::aggregates::group_values::multi_group_by::Nulls;
use crate::aggregates::group_values::multi_group_by::{FixedBitPackedMutableBuffer, Nulls};
use crate::aggregates::group_values::multi_group_by::{nulls_equal_to, GroupColumn};
use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
use arrow::array::{Array as _, ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder};
Expand Down Expand Up @@ -81,32 +81,31 @@ impl<const NULLABLE: bool> GroupColumn for BooleanGroupValueBuilder<NULLABLE> {
lhs_rows: &[usize],
array: &ArrayRef,
rhs_rows: &[usize],
equal_to_results: &mut [bool],
equal_to_results: &mut FixedBitPackedMutableBuffer,
) {
let array = array.as_boolean();

let iter = izip!(
lhs_rows.iter(),
rhs_rows.iter(),
equal_to_results.iter_mut(),
);

for (&lhs_row, &rhs_row, equal_to_result) in iter {
for (index, (&lhs_row, &rhs_row)) in iter.enumerate() {
// Has found not equal to in previous column, don't need to check
if !*equal_to_result {
if !equal_to_results.0.get_bit(index) {
continue;
}

if NULLABLE {
let exist_null = self.nulls.is_null(lhs_row);
let input_null = array.is_null(rhs_row);
if let Some(result) = nulls_equal_to(exist_null, input_null) {
*equal_to_result = result;
equal_to_results.0.set_bit(index, result);
continue;
}
}

*equal_to_result = self.buffer.get_bit(lhs_row) == array.value(rhs_row);
equal_to_results.0.set_bit(index, self.buffer.get_bit(lhs_row) == array.value(rhs_row));
}
}

Expand Down Expand Up @@ -213,10 +212,10 @@ mod tests {
lhs_rows: &[usize],
input_array: &ArrayRef,
rhs_rows: &[usize],
equal_to_results: &mut Vec<bool>| {
equal_to_results: &mut FixedBitPackedMutableBuffer| {
let iter = lhs_rows.iter().zip(rhs_rows.iter());
for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() {
equal_to_results[idx] = builder.equal_to(lhs_row, input_array, rhs_row);
equal_to_results.set_bit(idx, builder.equal_to(lhs_row, input_array, rhs_row));
}
};

Expand All @@ -237,7 +236,7 @@ mod tests {
lhs_rows: &[usize],
input_array: &ArrayRef,
rhs_rows: &[usize],
equal_to_results: &mut Vec<bool>| {
equal_to_results: &mut FixedBitPackedMutableBuffer| {
builder.vectorized_equal_to(
lhs_rows,
input_array,
Expand All @@ -257,7 +256,7 @@ mod tests {
&[usize],
&ArrayRef,
&[usize],
&mut Vec<bool>,
&mut FixedBitPackedMutableBuffer,
),
{
// Will cover such cases:
Expand Down Expand Up @@ -302,14 +301,15 @@ mod tests {
let input_array = Arc::new(BooleanArray::new(values, nulls.finish())) as ArrayRef;

// Check
let mut equal_to_results = vec![true; builder.len()];
let mut equal_to_results = FixedBitPackedMutableBuffer::new_set(builder.len());
equal_to(
&builder,
&[0, 1, 2, 3, 4, 5],
&input_array,
&[0, 1, 2, 3, 4, 5],
&mut equal_to_results,
);
let equal_to_results: Vec<bool> = equal_to_results.into();

assert!(!equal_to_results[0]);
assert!(equal_to_results[1]);
Expand All @@ -333,10 +333,10 @@ mod tests {
lhs_rows: &[usize],
input_array: &ArrayRef,
rhs_rows: &[usize],
equal_to_results: &mut Vec<bool>| {
equal_to_results: &mut FixedBitPackedMutableBuffer| {
let iter = lhs_rows.iter().zip(rhs_rows.iter());
for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() {
equal_to_results[idx] = builder.equal_to(lhs_row, input_array, rhs_row);
equal_to_results.set_bit(idx, builder.equal_to(lhs_row, input_array, rhs_row));
}
};

Expand All @@ -357,7 +357,7 @@ mod tests {
lhs_rows: &[usize],
input_array: &ArrayRef,
rhs_rows: &[usize],
equal_to_results: &mut Vec<bool>| {
equal_to_results: &mut FixedBitPackedMutableBuffer| {
builder.vectorized_equal_to(
lhs_rows,
input_array,
Expand All @@ -377,7 +377,7 @@ mod tests {
&[usize],
&ArrayRef,
&[usize],
&mut Vec<bool>,
&mut FixedBitPackedMutableBuffer,
),
{
// Will cover such cases:
Expand All @@ -403,14 +403,15 @@ mod tests {
])) as ArrayRef;

// Check
let mut equal_to_results = vec![true; builder.len()];
let mut equal_to_results = FixedBitPackedMutableBuffer::new_set(builder.len());
equal_to(
&builder,
&[0, 1, 2, 3],
&input_array,
&[0, 1, 2, 3],
&mut equal_to_results,
);
let equal_to_results: Vec<bool> = equal_to_results.into();

assert!(equal_to_results[0]);
assert!(!equal_to_results[1]);
Expand All @@ -432,13 +433,14 @@ mod tests {
.vectorized_append(&all_nulls_input_array, &[0, 1, 2, 3, 4])
.unwrap();

let mut equal_to_results = vec![true; all_nulls_input_array.len()];
let mut equal_to_results = FixedBitPackedMutableBuffer::new_set(all_nulls_input_array.len());
builder.vectorized_equal_to(
&[0, 1, 2, 3, 4],
&all_nulls_input_array,
&[0, 1, 2, 3, 4],
&mut equal_to_results,
);
let equal_to_results: Vec<bool> = equal_to_results.into();

assert!(equal_to_results[0]);
assert!(equal_to_results[1]);
Expand All @@ -458,13 +460,14 @@ mod tests {
.vectorized_append(&all_not_nulls_input_array, &[0, 1, 2, 3, 4])
.unwrap();

let mut equal_to_results = vec![true; all_not_nulls_input_array.len()];
let mut equal_to_results = FixedBitPackedMutableBuffer::new_set(all_not_nulls_input_array.len());
builder.vectorized_equal_to(
&[5, 6, 7, 8, 9],
&all_not_nulls_input_array,
&[0, 1, 2, 3, 4],
&mut equal_to_results,
);
let equal_to_results: Vec<bool> = equal_to_results.into();

assert!(equal_to_results[0]);
assert!(equal_to_results[1]);
Expand Down
Loading
Loading