Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/functions-nested/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,7 @@ name = "array_remove"
[[bench]]
harness = false
name = "array_repeat"

[[bench]]
harness = false
name = "array_set_ops"
169 changes: 169 additions & 0 deletions datafusion/functions-nested/benches/array_set_ops.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#[macro_use]
extern crate criterion;

use arrow::array::{ArrayRef, Int64Array, ListArray};
use arrow::buffer::OffsetBuffer;
use arrow::datatypes::{DataType, Field};
use criterion::{BenchmarkId, Criterion};
use datafusion_common::config::ConfigOptions;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
use datafusion_functions_nested::set_ops::{ArrayIntersect, ArrayUnion};
use rand::SeedableRng;
use rand::prelude::SliceRandom;
use rand::rngs::StdRng;
use std::collections::HashSet;
use std::hint::black_box;
use std::sync::Arc;

const NUM_ROWS: usize = 1000;
const ARRAY_SIZES: &[usize] = &[10, 50, 100];
const SEED: u64 = 42;

fn criterion_benchmark(c: &mut Criterion) {
bench_array_union(c);
bench_array_intersect(c);
}

fn invoke_udf(udf: &impl ScalarUDFImpl, array1: &ArrayRef, array2: &ArrayRef) {
black_box(
udf.invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(array1.clone()),
ColumnarValue::Array(array2.clone()),
],
arg_fields: vec![
Field::new("arr1", array1.data_type().clone(), false).into(),
Field::new("arr2", array2.data_type().clone(), false).into(),
],
number_rows: NUM_ROWS,
return_field: Field::new("result", array1.data_type().clone(), false).into(),
config_options: Arc::new(ConfigOptions::default()),
})
.unwrap(),
);
}

fn bench_array_union(c: &mut Criterion) {
let mut group = c.benchmark_group("array_union");
let udf = ArrayUnion::new();

for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] {
for &array_size in ARRAY_SIZES {
let (array1, array2) =
create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio);
group.bench_with_input(
BenchmarkId::new(*overlap_label, array_size),
&array_size,
|b, _| b.iter(|| invoke_udf(&udf, &array1, &array2)),
);
}
}

group.finish();
}

fn bench_array_intersect(c: &mut Criterion) {
let mut group = c.benchmark_group("array_intersect");
let udf = ArrayIntersect::new();

for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] {
for &array_size in ARRAY_SIZES {
let (array1, array2) =
create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio);
group.bench_with_input(
BenchmarkId::new(*overlap_label, array_size),
&array_size,
|b, _| b.iter(|| invoke_udf(&udf, &array1, &array2)),
);
}
}

group.finish();
}

fn create_arrays_with_overlap(
num_rows: usize,
array_size: usize,
overlap_ratio: f64,
) -> (ArrayRef, ArrayRef) {
assert!((0.0..=1.0).contains(&overlap_ratio));
let overlap_count = ((array_size as f64) * overlap_ratio).round() as usize;

let mut rng = StdRng::seed_from_u64(SEED);

let mut values1 = Vec::with_capacity(num_rows * array_size);
let mut values2 = Vec::with_capacity(num_rows * array_size);

for row in 0..num_rows {
let base = (row as i64) * (array_size as i64) * 2;

for i in 0..array_size {
values1.push(base + i as i64);
}

let mut positions: Vec<usize> = (0..array_size).collect();
positions.shuffle(&mut rng);

let overlap_positions: HashSet<_> =
positions[..overlap_count].iter().copied().collect();

for i in 0..array_size {
if overlap_positions.contains(&i) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slice::contains() is O(n) (linear search). Using a HashSet would be O(1), but create_arrays_with_overlap() is called before group.bench_with_input(...), so maybe it is OK.

let overlap_positions: std::collections::HashSet<_> =
            positions[..overlap_count].iter().copied().collect();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

values2.push(base + i as i64);
} else {
values2.push(base + array_size as i64 + i as i64);
}
}
}

let values1 = Int64Array::from(values1);
let values2 = Int64Array::from(values2);

let field = Arc::new(Field::new("item", DataType::Int64, true));

let offsets = (0..=num_rows)
.map(|i| (i * array_size) as i32)
.collect::<Vec<i32>>();

let array1 = Arc::new(
ListArray::try_new(
field.clone(),
OffsetBuffer::new(offsets.clone().into()),
Arc::new(values1),
None,
)
.unwrap(),
);

let array2 = Arc::new(
ListArray::try_new(
field,
OffsetBuffer::new(offsets.into()),
Arc::new(values2),
None,
)
.unwrap(),
);

(array1, array2)
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
150 changes: 102 additions & 48 deletions datafusion/functions-nested/src/set_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,17 @@ impl ScalarUDFImpl for ArrayUnion {
)
)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub(super) struct ArrayIntersect {
pub struct ArrayIntersect {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now public to be able to use it in the benchmark test (a separate crate).
Maybe annotate it with #[doc(hidden)] to hide it from the end users, since it is not really supposed to be part of the public APIs ?!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArrayIntersect already has #[user_doc], and keeping it visible aligns with how other user-facing SQL functions are exposed?

signature: Signature,
aliases: Vec<String>,
}

impl Default for ArrayIntersect {
fn default() -> Self {
Self::new()
}
}

impl ArrayIntersect {
pub fn new() -> Self {
Self {
Expand Down Expand Up @@ -358,69 +364,117 @@ fn generic_set_lists<OffsetSize: OffsetSizeTrait>(
"{set_op:?} is not implemented for '{l:?}' and '{r:?}'"
);

let mut offsets = vec![OffsetSize::usize_as(0)];
let mut new_arrays = vec![];
// Convert all values to rows in batch for performance.
let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;
for (l_arr, r_arr) in l.iter().zip(r.iter()) {
let last_offset = *offsets.last().unwrap();
let rows_l = converter.convert_columns(&[Arc::clone(l.values())])?;
let rows_r = converter.convert_columns(&[Arc::clone(r.values())])?;

let (l_values, r_values) = match (l_arr, r_arr) {
(Some(l_arr), Some(r_arr)) => (
converter.convert_columns(&[l_arr])?,
converter.convert_columns(&[r_arr])?,
),
_ => {
offsets.push(last_offset);
continue;
}
};
match set_op {
SetOp::Union => generic_set_loop::<OffsetSize, true>(
l, r, &rows_l, &rows_r, field, &converter,
),
SetOp::Intersect => generic_set_loop::<OffsetSize, false>(
l, r, &rows_l, &rows_r, field, &converter,
),
}
}

let l_iter = l_values.iter().sorted().dedup();
let values_set: HashSet<_> = l_iter.clone().collect();
let mut rows = if set_op == SetOp::Union {
l_iter.collect()
} else {
vec![]
};
/// Inner loop for set operations, parameterized by const generic to
/// avoid branching inside the hot loop.
fn generic_set_loop<OffsetSize: OffsetSizeTrait, const IS_UNION: bool>(
l: &GenericListArray<OffsetSize>,
r: &GenericListArray<OffsetSize>,
rows_l: &arrow::row::Rows,
rows_r: &arrow::row::Rows,
field: Arc<Field>,
converter: &RowConverter,
) -> Result<ArrayRef> {
let l_offsets = l.value_offsets();
let r_offsets = r.value_offsets();

let mut result_offsets = Vec::with_capacity(l.len() + 1);
result_offsets.push(OffsetSize::usize_as(0));
let initial_capacity = if IS_UNION {
// Union can include all elements from both sides
rows_l.num_rows()
} else {
// Intersect result is bounded by the smaller side
rows_l.num_rows().min(rows_r.num_rows())
};

let mut final_rows = Vec::with_capacity(initial_capacity);

// Reuse hash sets across iterations
let mut seen = HashSet::new();
let mut lookup_set = HashSet::new();
for i in 0..l.len() {
let last_offset = *result_offsets.last().unwrap();

for r_val in r_values.iter().sorted().dedup() {
match set_op {
SetOp::Union => {
if !values_set.contains(&r_val) {
rows.push(r_val);
}
if l.is_null(i) || r.is_null(i) {
result_offsets.push(last_offset);
continue;
}

let l_start = l_offsets[i].as_usize();
let l_end = l_offsets[i + 1].as_usize();
let r_start = r_offsets[i].as_usize();
let r_end = r_offsets[i + 1].as_usize();

seen.clear();

if IS_UNION {
for idx in l_start..l_end {
let row = rows_l.row(idx);
if seen.insert(row) {
final_rows.push(row);
}
SetOp::Intersect => {
if values_set.contains(&r_val) {
rows.push(r_val);
}
}
for idx in r_start..r_end {
let row = rows_r.row(idx);
if seen.insert(row) {
final_rows.push(row);
}
}
}

offsets.push(last_offset + OffsetSize::usize_as(rows.len()));
let arrays = converter.convert_rows(rows)?;
let array = match arrays.first() {
Some(array) => Arc::clone(array),
None => {
return internal_err!("{set_op}: failed to get array from rows");
} else {
let l_len = l_end - l_start;
let r_len = r_end - r_start;

// Select shorter side for lookup, longer side for probing
let (lookup_rows, lookup_range, probe_rows, probe_range) = if l_len < r_len {
(rows_l, l_start..l_end, rows_r, r_start..r_end)
} else {
(rows_r, r_start..r_end, rows_l, l_start..l_end)
};
lookup_set.clear();
lookup_set.reserve(lookup_range.len());

// Build lookup table
for idx in lookup_range {
lookup_set.insert(lookup_rows.row(idx));
}
};

new_arrays.push(array);
// Probe and emit distinct intersected rows
for idx in probe_range {
let row = probe_rows.row(idx);
if lookup_set.contains(&row) && seen.insert(row) {
final_rows.push(row);
}
}
}
result_offsets.push(last_offset + OffsetSize::usize_as(seen.len()));
}

let offsets = OffsetBuffer::new(offsets.into());
let new_arrays_ref: Vec<_> = new_arrays.iter().map(|v| v.as_ref()).collect();
let values = if new_arrays_ref.is_empty() {
let final_values = if final_rows.is_empty() {
new_empty_array(&l.value_type())
} else {
compute::concat(&new_arrays_ref)?
let arrays = converter.convert_rows(final_rows)?;
Arc::clone(&arrays[0])
};

let arr = GenericListArray::<OffsetSize>::try_new(
field,
offsets,
values,
OffsetBuffer::new(result_offsets.into()),
final_values,
NullBuffer::union(l.nulls(), r.nulls()),
)?;
Ok(Arc::new(arr))
Expand Down
Loading