diff --git a/datafusion/functions-nested/Cargo.toml b/datafusion/functions-nested/Cargo.toml index bfbfbd56c8baf..e5e601f30ae84 100644 --- a/datafusion/functions-nested/Cargo.toml +++ b/datafusion/functions-nested/Cargo.toml @@ -92,3 +92,7 @@ name = "array_remove" [[bench]] harness = false name = "array_repeat" + +[[bench]] +harness = false +name = "array_set_ops" diff --git a/datafusion/functions-nested/benches/array_set_ops.rs b/datafusion/functions-nested/benches/array_set_ops.rs new file mode 100644 index 0000000000000..2b21c50b27823 --- /dev/null +++ b/datafusion/functions-nested/benches/array_set_ops.rs @@ -0,0 +1,169 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate criterion; + +use arrow::array::{ArrayRef, Int64Array, ListArray}; +use arrow::buffer::OffsetBuffer; +use arrow::datatypes::{DataType, Field}; +use criterion::{BenchmarkId, Criterion}; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; +use datafusion_functions_nested::set_ops::{ArrayIntersect, ArrayUnion}; +use rand::SeedableRng; +use rand::prelude::SliceRandom; +use rand::rngs::StdRng; +use std::collections::HashSet; +use std::hint::black_box; +use std::sync::Arc; + +const NUM_ROWS: usize = 1000; +const ARRAY_SIZES: &[usize] = &[10, 50, 100]; +const SEED: u64 = 42; + +fn criterion_benchmark(c: &mut Criterion) { + bench_array_union(c); + bench_array_intersect(c); +} + +fn invoke_udf(udf: &impl ScalarUDFImpl, array1: &ArrayRef, array2: &ArrayRef) { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(array1.clone()), + ColumnarValue::Array(array2.clone()), + ], + arg_fields: vec![ + Field::new("arr1", array1.data_type().clone(), false).into(), + Field::new("arr2", array2.data_type().clone(), false).into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new("result", array1.data_type().clone(), false).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ); +} + +fn bench_array_union(c: &mut Criterion) { + let mut group = c.benchmark_group("array_union"); + let udf = ArrayUnion::new(); + + for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] { + for &array_size in ARRAY_SIZES { + let (array1, array2) = + create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio); + group.bench_with_input( + BenchmarkId::new(*overlap_label, array_size), + &array_size, + |b, _| b.iter(|| invoke_udf(&udf, &array1, &array2)), + ); + } + } + + group.finish(); +} + +fn bench_array_intersect(c: &mut Criterion) { + let mut group = c.benchmark_group("array_intersect"); + let udf = ArrayIntersect::new(); + + for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] { + for &array_size in ARRAY_SIZES { + let (array1, array2) = + create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio); + group.bench_with_input( + BenchmarkId::new(*overlap_label, array_size), + &array_size, + |b, _| b.iter(|| invoke_udf(&udf, &array1, &array2)), + ); + } + } + + group.finish(); +} + +fn create_arrays_with_overlap( + num_rows: usize, + array_size: usize, + overlap_ratio: f64, +) -> (ArrayRef, ArrayRef) { + assert!((0.0..=1.0).contains(&overlap_ratio)); + let overlap_count = ((array_size as f64) * overlap_ratio).round() as usize; + + let mut rng = StdRng::seed_from_u64(SEED); + + let mut values1 = Vec::with_capacity(num_rows * array_size); + let mut values2 = Vec::with_capacity(num_rows * array_size); + + for row in 0..num_rows { + let base = (row as i64) * (array_size as i64) * 2; + + for i in 0..array_size { + values1.push(base + i as i64); + } + + let mut positions: Vec = (0..array_size).collect(); + positions.shuffle(&mut rng); + + let overlap_positions: HashSet<_> = + positions[..overlap_count].iter().copied().collect(); + + for i in 0..array_size { + if overlap_positions.contains(&i) { + values2.push(base + i as i64); + } else { + values2.push(base + array_size as i64 + i as i64); + } + } + } + + let values1 = Int64Array::from(values1); + let values2 = Int64Array::from(values2); + + let field = Arc::new(Field::new("item", DataType::Int64, true)); + + let offsets = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + let array1 = Arc::new( + ListArray::try_new( + field.clone(), + OffsetBuffer::new(offsets.clone().into()), + Arc::new(values1), + None, + ) + .unwrap(), + ); + + let array2 = Arc::new( + ListArray::try_new( + field, + OffsetBuffer::new(offsets.into()), + Arc::new(values2), + None, + ) + .unwrap(), + ); + + (array1, array2) +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions-nested/src/set_ops.rs b/datafusion/functions-nested/src/set_ops.rs index 8799af6d491c2..370599611feef 100644 --- a/datafusion/functions-nested/src/set_ops.rs +++ b/datafusion/functions-nested/src/set_ops.rs @@ -184,11 +184,17 @@ impl ScalarUDFImpl for ArrayUnion { ) )] #[derive(Debug, PartialEq, Eq, Hash)] -pub(super) struct ArrayIntersect { +pub struct ArrayIntersect { signature: Signature, aliases: Vec, } +impl Default for ArrayIntersect { + fn default() -> Self { + Self::new() + } +} + impl ArrayIntersect { pub fn new() -> Self { Self { @@ -358,69 +364,117 @@ fn generic_set_lists( "{set_op:?} is not implemented for '{l:?}' and '{r:?}'" ); - let mut offsets = vec![OffsetSize::usize_as(0)]; - let mut new_arrays = vec![]; + // Convert all values to rows in batch for performance. let converter = RowConverter::new(vec![SortField::new(l.value_type())])?; - for (l_arr, r_arr) in l.iter().zip(r.iter()) { - let last_offset = *offsets.last().unwrap(); + let rows_l = converter.convert_columns(&[Arc::clone(l.values())])?; + let rows_r = converter.convert_columns(&[Arc::clone(r.values())])?; - let (l_values, r_values) = match (l_arr, r_arr) { - (Some(l_arr), Some(r_arr)) => ( - converter.convert_columns(&[l_arr])?, - converter.convert_columns(&[r_arr])?, - ), - _ => { - offsets.push(last_offset); - continue; - } - }; + match set_op { + SetOp::Union => generic_set_loop::( + l, r, &rows_l, &rows_r, field, &converter, + ), + SetOp::Intersect => generic_set_loop::( + l, r, &rows_l, &rows_r, field, &converter, + ), + } +} - let l_iter = l_values.iter().sorted().dedup(); - let values_set: HashSet<_> = l_iter.clone().collect(); - let mut rows = if set_op == SetOp::Union { - l_iter.collect() - } else { - vec![] - }; +/// Inner loop for set operations, parameterized by const generic to +/// avoid branching inside the hot loop. +fn generic_set_loop( + l: &GenericListArray, + r: &GenericListArray, + rows_l: &arrow::row::Rows, + rows_r: &arrow::row::Rows, + field: Arc, + converter: &RowConverter, +) -> Result { + let l_offsets = l.value_offsets(); + let r_offsets = r.value_offsets(); + + let mut result_offsets = Vec::with_capacity(l.len() + 1); + result_offsets.push(OffsetSize::usize_as(0)); + let initial_capacity = if IS_UNION { + // Union can include all elements from both sides + rows_l.num_rows() + } else { + // Intersect result is bounded by the smaller side + rows_l.num_rows().min(rows_r.num_rows()) + }; + + let mut final_rows = Vec::with_capacity(initial_capacity); + + // Reuse hash sets across iterations + let mut seen = HashSet::new(); + let mut lookup_set = HashSet::new(); + for i in 0..l.len() { + let last_offset = *result_offsets.last().unwrap(); - for r_val in r_values.iter().sorted().dedup() { - match set_op { - SetOp::Union => { - if !values_set.contains(&r_val) { - rows.push(r_val); - } + if l.is_null(i) || r.is_null(i) { + result_offsets.push(last_offset); + continue; + } + + let l_start = l_offsets[i].as_usize(); + let l_end = l_offsets[i + 1].as_usize(); + let r_start = r_offsets[i].as_usize(); + let r_end = r_offsets[i + 1].as_usize(); + + seen.clear(); + + if IS_UNION { + for idx in l_start..l_end { + let row = rows_l.row(idx); + if seen.insert(row) { + final_rows.push(row); } - SetOp::Intersect => { - if values_set.contains(&r_val) { - rows.push(r_val); - } + } + for idx in r_start..r_end { + let row = rows_r.row(idx); + if seen.insert(row) { + final_rows.push(row); } } - } - - offsets.push(last_offset + OffsetSize::usize_as(rows.len())); - let arrays = converter.convert_rows(rows)?; - let array = match arrays.first() { - Some(array) => Arc::clone(array), - None => { - return internal_err!("{set_op}: failed to get array from rows"); + } else { + let l_len = l_end - l_start; + let r_len = r_end - r_start; + + // Select shorter side for lookup, longer side for probing + let (lookup_rows, lookup_range, probe_rows, probe_range) = if l_len < r_len { + (rows_l, l_start..l_end, rows_r, r_start..r_end) + } else { + (rows_r, r_start..r_end, rows_l, l_start..l_end) + }; + lookup_set.clear(); + lookup_set.reserve(lookup_range.len()); + + // Build lookup table + for idx in lookup_range { + lookup_set.insert(lookup_rows.row(idx)); } - }; - new_arrays.push(array); + // Probe and emit distinct intersected rows + for idx in probe_range { + let row = probe_rows.row(idx); + if lookup_set.contains(&row) && seen.insert(row) { + final_rows.push(row); + } + } + } + result_offsets.push(last_offset + OffsetSize::usize_as(seen.len())); } - let offsets = OffsetBuffer::new(offsets.into()); - let new_arrays_ref: Vec<_> = new_arrays.iter().map(|v| v.as_ref()).collect(); - let values = if new_arrays_ref.is_empty() { + let final_values = if final_rows.is_empty() { new_empty_array(&l.value_type()) } else { - compute::concat(&new_arrays_ref)? + let arrays = converter.convert_rows(final_rows)?; + Arc::clone(&arrays[0]) }; + let arr = GenericListArray::::try_new( field, - offsets, - values, + OffsetBuffer::new(result_offsets.into()), + final_values, NullBuffer::union(l.nulls(), r.nulls()), )?; Ok(Arc::new(arr)) diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 2b98ae14d298b..8c0752d2ed577 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -6905,7 +6905,7 @@ select array_intersect(column1, column2), array_intersect(column5, column6) from array_intersect_table_1D_Boolean; ---- -[] [false, true] [false] +[] [true, false] [false] [false] [true] [true] query ??? @@ -6914,7 +6914,7 @@ select array_intersect(column1, column2), array_intersect(column5, column6) from large_array_intersect_table_1D_Boolean; ---- -[] [false, true] [false] +[] [true, false] [false] [false] [true] [true] query ??? @@ -6923,8 +6923,8 @@ select array_intersect(column1, column2), array_intersect(column5, column6) from array_intersect_table_1D_UTF8; ---- -[bc] [arrow, rust] [] -[] [arrow, datafusion, rust] [arrow, rust] +[bc] [rust, arrow] [] +[] [datafusion, rust, arrow] [rust, arrow] query ??? select array_intersect(column1, column2), @@ -6932,8 +6932,8 @@ select array_intersect(column1, column2), array_intersect(column5, column6) from large_array_intersect_table_1D_UTF8; ---- -[bc] [arrow, rust] [] -[] [arrow, datafusion, rust] [arrow, rust] +[bc] [rust, arrow] [] +[] [datafusion, rust, arrow] [rust, arrow] query ? select array_intersect(column1, column2)