-
Notifications
You must be signed in to change notification settings - Fork 1.9k
perf: improve performance of array_union/array_intersect with batched row conversion
#20243
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
c396950
01affff
029bfb3
0eb66b9
605cf42
8284bbb
0e2dc47
e718bc7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,169 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| #[macro_use] | ||
| extern crate criterion; | ||
|
|
||
| use arrow::array::{ArrayRef, Int64Array, ListArray}; | ||
| use arrow::buffer::OffsetBuffer; | ||
| use arrow::datatypes::{DataType, Field}; | ||
| use criterion::{BenchmarkId, Criterion}; | ||
| use datafusion_common::config::ConfigOptions; | ||
| use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; | ||
| use datafusion_functions_nested::set_ops::{ArrayIntersect, ArrayUnion}; | ||
| use rand::SeedableRng; | ||
| use rand::prelude::SliceRandom; | ||
| use rand::rngs::StdRng; | ||
| use std::collections::HashSet; | ||
| use std::hint::black_box; | ||
| use std::sync::Arc; | ||
|
|
||
| const NUM_ROWS: usize = 1000; | ||
| const ARRAY_SIZES: &[usize] = &[10, 50, 100]; | ||
| const SEED: u64 = 42; | ||
|
|
||
| fn criterion_benchmark(c: &mut Criterion) { | ||
| bench_array_union(c); | ||
| bench_array_intersect(c); | ||
| } | ||
|
|
||
| fn invoke_udf(udf: &impl ScalarUDFImpl, array1: &ArrayRef, array2: &ArrayRef) { | ||
| black_box( | ||
| udf.invoke_with_args(ScalarFunctionArgs { | ||
| args: vec![ | ||
| ColumnarValue::Array(array1.clone()), | ||
| ColumnarValue::Array(array2.clone()), | ||
| ], | ||
| arg_fields: vec![ | ||
| Field::new("arr1", array1.data_type().clone(), false).into(), | ||
| Field::new("arr2", array2.data_type().clone(), false).into(), | ||
| ], | ||
| number_rows: NUM_ROWS, | ||
| return_field: Field::new("result", array1.data_type().clone(), false).into(), | ||
| config_options: Arc::new(ConfigOptions::default()), | ||
| }) | ||
| .unwrap(), | ||
| ); | ||
| } | ||
|
|
||
| fn bench_array_union(c: &mut Criterion) { | ||
| let mut group = c.benchmark_group("array_union"); | ||
| let udf = ArrayUnion::new(); | ||
|
|
||
| for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] { | ||
| for &array_size in ARRAY_SIZES { | ||
| let (array1, array2) = | ||
| create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio); | ||
| group.bench_with_input( | ||
| BenchmarkId::new(*overlap_label, array_size), | ||
| &array_size, | ||
| |b, _| b.iter(|| invoke_udf(&udf, &array1, &array2)), | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| group.finish(); | ||
| } | ||
|
|
||
| fn bench_array_intersect(c: &mut Criterion) { | ||
| let mut group = c.benchmark_group("array_intersect"); | ||
| let udf = ArrayIntersect::new(); | ||
|
|
||
| for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] { | ||
| for &array_size in ARRAY_SIZES { | ||
| let (array1, array2) = | ||
| create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio); | ||
| group.bench_with_input( | ||
| BenchmarkId::new(*overlap_label, array_size), | ||
| &array_size, | ||
| |b, _| b.iter(|| invoke_udf(&udf, &array1, &array2)), | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| group.finish(); | ||
| } | ||
|
|
||
| fn create_arrays_with_overlap( | ||
| num_rows: usize, | ||
| array_size: usize, | ||
| overlap_ratio: f64, | ||
| ) -> (ArrayRef, ArrayRef) { | ||
| assert!((0.0..=1.0).contains(&overlap_ratio)); | ||
| let overlap_count = ((array_size as f64) * overlap_ratio).round() as usize; | ||
|
|
||
| let mut rng = StdRng::seed_from_u64(SEED); | ||
|
|
||
| let mut values1 = Vec::with_capacity(num_rows * array_size); | ||
| let mut values2 = Vec::with_capacity(num_rows * array_size); | ||
|
|
||
| for row in 0..num_rows { | ||
| let base = (row as i64) * (array_size as i64) * 2; | ||
|
|
||
| for i in 0..array_size { | ||
| values1.push(base + i as i64); | ||
| } | ||
|
|
||
| let mut positions: Vec<usize> = (0..array_size).collect(); | ||
| positions.shuffle(&mut rng); | ||
|
|
||
| let overlap_positions: HashSet<_> = | ||
| positions[..overlap_count].iter().copied().collect(); | ||
|
|
||
| for i in 0..array_size { | ||
| if overlap_positions.contains(&i) { | ||
| values2.push(base + i as i64); | ||
| } else { | ||
| values2.push(base + array_size as i64 + i as i64); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| let values1 = Int64Array::from(values1); | ||
| let values2 = Int64Array::from(values2); | ||
|
|
||
| let field = Arc::new(Field::new("item", DataType::Int64, true)); | ||
|
|
||
| let offsets = (0..=num_rows) | ||
| .map(|i| (i * array_size) as i32) | ||
| .collect::<Vec<i32>>(); | ||
|
|
||
| let array1 = Arc::new( | ||
| ListArray::try_new( | ||
| field.clone(), | ||
| OffsetBuffer::new(offsets.clone().into()), | ||
| Arc::new(values1), | ||
| None, | ||
| ) | ||
| .unwrap(), | ||
| ); | ||
|
|
||
| let array2 = Arc::new( | ||
| ListArray::try_new( | ||
| field, | ||
| OffsetBuffer::new(offsets.into()), | ||
| Arc::new(values2), | ||
| None, | ||
| ) | ||
| .unwrap(), | ||
| ); | ||
|
|
||
| (array1, array2) | ||
| } | ||
|
|
||
| criterion_group!(benches, criterion_benchmark); | ||
| criterion_main!(benches); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -184,11 +184,17 @@ impl ScalarUDFImpl for ArrayUnion { | |
| ) | ||
| )] | ||
| #[derive(Debug, PartialEq, Eq, Hash)] | ||
| pub(super) struct ArrayIntersect { | ||
| pub struct ArrayIntersect { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is now public to be able to use it in the benchmark test (a separate crate).
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| signature: Signature, | ||
| aliases: Vec<String>, | ||
| } | ||
|
|
||
| impl Default for ArrayIntersect { | ||
| fn default() -> Self { | ||
| Self::new() | ||
| } | ||
| } | ||
|
|
||
| impl ArrayIntersect { | ||
| pub fn new() -> Self { | ||
| Self { | ||
|
|
@@ -358,69 +364,117 @@ fn generic_set_lists<OffsetSize: OffsetSizeTrait>( | |
| "{set_op:?} is not implemented for '{l:?}' and '{r:?}'" | ||
| ); | ||
|
|
||
| let mut offsets = vec![OffsetSize::usize_as(0)]; | ||
| let mut new_arrays = vec![]; | ||
| // Convert all values to rows in batch for performance. | ||
| let converter = RowConverter::new(vec![SortField::new(l.value_type())])?; | ||
| for (l_arr, r_arr) in l.iter().zip(r.iter()) { | ||
| let last_offset = *offsets.last().unwrap(); | ||
| let rows_l = converter.convert_columns(&[Arc::clone(l.values())])?; | ||
| let rows_r = converter.convert_columns(&[Arc::clone(r.values())])?; | ||
|
|
||
| let (l_values, r_values) = match (l_arr, r_arr) { | ||
| (Some(l_arr), Some(r_arr)) => ( | ||
| converter.convert_columns(&[l_arr])?, | ||
| converter.convert_columns(&[r_arr])?, | ||
| ), | ||
| _ => { | ||
| offsets.push(last_offset); | ||
| continue; | ||
| } | ||
| }; | ||
| match set_op { | ||
| SetOp::Union => generic_set_loop::<OffsetSize, true>( | ||
| l, r, &rows_l, &rows_r, field, &converter, | ||
| ), | ||
| SetOp::Intersect => generic_set_loop::<OffsetSize, false>( | ||
| l, r, &rows_l, &rows_r, field, &converter, | ||
| ), | ||
| } | ||
| } | ||
|
|
||
| let l_iter = l_values.iter().sorted().dedup(); | ||
| let values_set: HashSet<_> = l_iter.clone().collect(); | ||
| let mut rows = if set_op == SetOp::Union { | ||
| l_iter.collect() | ||
| } else { | ||
| vec![] | ||
| }; | ||
| /// Inner loop for set operations, parameterized by const generic to | ||
| /// avoid branching inside the hot loop. | ||
| fn generic_set_loop<OffsetSize: OffsetSizeTrait, const IS_UNION: bool>( | ||
| l: &GenericListArray<OffsetSize>, | ||
| r: &GenericListArray<OffsetSize>, | ||
| rows_l: &arrow::row::Rows, | ||
| rows_r: &arrow::row::Rows, | ||
| field: Arc<Field>, | ||
| converter: &RowConverter, | ||
| ) -> Result<ArrayRef> { | ||
| let l_offsets = l.value_offsets(); | ||
| let r_offsets = r.value_offsets(); | ||
|
|
||
| let mut result_offsets = Vec::with_capacity(l.len() + 1); | ||
| result_offsets.push(OffsetSize::usize_as(0)); | ||
| let initial_capacity = if IS_UNION { | ||
| // Union can include all elements from both sides | ||
| rows_l.num_rows() | ||
| } else { | ||
| // Intersect result is bounded by the smaller side | ||
| rows_l.num_rows().min(rows_r.num_rows()) | ||
| }; | ||
|
|
||
| let mut final_rows = Vec::with_capacity(initial_capacity); | ||
|
|
||
| // Reuse hash sets across iterations | ||
| let mut seen = HashSet::new(); | ||
| let mut lookup_set = HashSet::new(); | ||
| for i in 0..l.len() { | ||
| let last_offset = *result_offsets.last().unwrap(); | ||
|
|
||
| for r_val in r_values.iter().sorted().dedup() { | ||
| match set_op { | ||
| SetOp::Union => { | ||
| if !values_set.contains(&r_val) { | ||
| rows.push(r_val); | ||
| } | ||
| if l.is_null(i) || r.is_null(i) { | ||
| result_offsets.push(last_offset); | ||
| continue; | ||
| } | ||
|
|
||
| let l_start = l_offsets[i].as_usize(); | ||
| let l_end = l_offsets[i + 1].as_usize(); | ||
| let r_start = r_offsets[i].as_usize(); | ||
| let r_end = r_offsets[i + 1].as_usize(); | ||
|
|
||
| seen.clear(); | ||
|
|
||
| if IS_UNION { | ||
| for idx in l_start..l_end { | ||
| let row = rows_l.row(idx); | ||
| if seen.insert(row) { | ||
| final_rows.push(row); | ||
| } | ||
| SetOp::Intersect => { | ||
| if values_set.contains(&r_val) { | ||
| rows.push(r_val); | ||
| } | ||
| } | ||
| for idx in r_start..r_end { | ||
| let row = rows_r.row(idx); | ||
| if seen.insert(row) { | ||
| final_rows.push(row); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| offsets.push(last_offset + OffsetSize::usize_as(rows.len())); | ||
| let arrays = converter.convert_rows(rows)?; | ||
| let array = match arrays.first() { | ||
| Some(array) => Arc::clone(array), | ||
| None => { | ||
| return internal_err!("{set_op}: failed to get array from rows"); | ||
| } else { | ||
| let l_len = l_end - l_start; | ||
| let r_len = r_end - r_start; | ||
|
|
||
| // Select shorter side for lookup, longer side for probing | ||
| let (lookup_rows, lookup_range, probe_rows, probe_range) = if l_len < r_len { | ||
| (rows_l, l_start..l_end, rows_r, r_start..r_end) | ||
| } else { | ||
| (rows_r, r_start..r_end, rows_l, l_start..l_end) | ||
| }; | ||
| lookup_set.clear(); | ||
| lookup_set.reserve(lookup_range.len()); | ||
|
|
||
| // Build lookup table | ||
| for idx in lookup_range { | ||
| lookup_set.insert(lookup_rows.row(idx)); | ||
| } | ||
| }; | ||
|
|
||
| new_arrays.push(array); | ||
| // Probe and emit distinct intersected rows | ||
| for idx in probe_range { | ||
| let row = probe_rows.row(idx); | ||
| if lookup_set.contains(&row) && seen.insert(row) { | ||
| final_rows.push(row); | ||
| } | ||
| } | ||
| } | ||
| result_offsets.push(last_offset + OffsetSize::usize_as(seen.len())); | ||
| } | ||
|
|
||
| let offsets = OffsetBuffer::new(offsets.into()); | ||
| let new_arrays_ref: Vec<_> = new_arrays.iter().map(|v| v.as_ref()).collect(); | ||
| let values = if new_arrays_ref.is_empty() { | ||
| let final_values = if final_rows.is_empty() { | ||
| new_empty_array(&l.value_type()) | ||
| } else { | ||
| compute::concat(&new_arrays_ref)? | ||
| let arrays = converter.convert_rows(final_rows)?; | ||
| Arc::clone(&arrays[0]) | ||
| }; | ||
|
|
||
| let arr = GenericListArray::<OffsetSize>::try_new( | ||
| field, | ||
| offsets, | ||
| values, | ||
| OffsetBuffer::new(result_offsets.into()), | ||
| final_values, | ||
| NullBuffer::union(l.nulls(), r.nulls()), | ||
| )?; | ||
| Ok(Arc::new(arr)) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slice::contains() is O(n) (linear search). Using a HashSet would be O(1), but
create_arrays_with_overlap()is called beforegroup.bench_with_input(...), so maybe it is OK.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.