Skip to content
208 changes: 148 additions & 60 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,33 +392,22 @@ fn hash_generic_byte_view_array<T: ByteViewType>(
}
}

/// Helper function to update hash for a dictionary key if the value is valid
#[cfg(not(feature = "force_hash_collisions"))]
#[inline]
fn update_hash_for_dict_key(
hash: &mut u64,
dict_hashes: &[u64],
dict_values: &dyn Array,
idx: usize,
multi_col: bool,
) {
if dict_values.is_valid(idx) {
if multi_col {
*hash = combine_hashes(dict_hashes[idx], *hash);
} else {
*hash = dict_hashes[idx];
}
}
// no update for invalid dictionary value
}

/// Hash the values in a dictionary array
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_dictionary<K: ArrowDictionaryKeyType>(
/// Hash dictionary array with compile-time specialization for null handling.
///
/// Uses const generics to eliminate runtim branching in the hot loop:
/// - `HAS_NULL_KEYS`: Whether to check for null dictionary keys
/// - `HAS_NULL_VALUES`: Whether to check for null dictionary values
/// - `MULTI_COL`: Whether to combine with existing hash (true) or initialize (false)
#[inline(never)]
fn hash_dictionary_inner<
K: ArrowDictionaryKeyType,
const HAS_NULL_KEYS: bool,
const HAS_NULL_VALUES: bool,
const MULTI_COL: bool,
>(
array: &DictionaryArray<K>,
random_state: &RandomState,
hashes_buffer: &mut [u64],
multi_col: bool,
) -> Result<()> {
// Hash each dictionary value once, and then use that computed
// hash for each key value to avoid a potentially expensive
Expand All @@ -427,22 +416,91 @@ fn hash_dictionary<K: ArrowDictionaryKeyType>(
let mut dict_hashes = vec![0; dict_values.len()];
create_hashes([dict_values], random_state, &mut dict_hashes)?;

// combine hash for each index in values
for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) {
if let Some(key) = key {
if HAS_NULL_KEYS {
for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) {
if let Some(key) = key {
let idx = key.as_usize();
if !HAS_NULL_VALUES || dict_values.is_valid(idx) {
if MULTI_COL {
*hash = combine_hashes(dict_hashes[idx], *hash);
} else {
*hash = dict_hashes[idx];
}
}
}
}
} else {
for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().values()) {
let idx = key.as_usize();
update_hash_for_dict_key(
hash,
&dict_hashes,
dict_values.as_ref(),
idx,
multi_col,
);
} // no update for Null key
if !HAS_NULL_VALUES || dict_values.is_valid(idx) {
if MULTI_COL {
*hash = combine_hashes(dict_hashes[idx], *hash);
} else {
*hash = dict_hashes[idx];
}
}
}
}
Ok(())
}

/// Hash the values in a dictionary array
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_dictionary<K: ArrowDictionaryKeyType>(
array: &DictionaryArray<K>,
random_state: &RandomState,
hashes_buffer: &mut [u64],
multi_col: bool,
) -> Result<()> {
let has_null_keys = array.keys().null_count() != 0;
let has_null_values = array.values().null_count() != 0;

// Dispatcher based on null presence and multi-column mode
// Should reduce branching within hot loops
match (has_null_keys, has_null_values, multi_col) {
(false, false, false) => hash_dictionary_inner::<K, false, false, false>(
array,
random_state,
hashes_buffer,
),
(false, false, true) => hash_dictionary_inner::<K, false, false, true>(
array,
random_state,
hashes_buffer,
),
(false, true, false) => hash_dictionary_inner::<K, false, true, false>(
array,
random_state,
hashes_buffer,
),
(false, true, true) => hash_dictionary_inner::<K, false, true, true>(
array,
random_state,
hashes_buffer,
),
(true, false, false) => hash_dictionary_inner::<K, true, false, false>(
array,
random_state,
hashes_buffer,
),
(true, false, true) => hash_dictionary_inner::<K, true, false, true>(
array,
random_state,
hashes_buffer,
),
(true, true, false) => hash_dictionary_inner::<K, true, true, false>(
array,
random_state,
hashes_buffer,
),
(true, true, true) => hash_dictionary_inner::<K, true, true, true>(
array,
random_state,
hashes_buffer,
),
}
}

#[cfg(not(feature = "force_hash_collisions"))]
fn hash_struct_array(
array: &StructArray,
Expand All @@ -452,19 +510,21 @@ fn hash_struct_array(
let nulls = array.nulls();
let row_len = array.len();

let valid_row_indices: Vec<usize> = if let Some(nulls) = nulls {
nulls.valid_indices().collect()
} else {
(0..row_len).collect()
};

// Create hashes for each row that combines the hashes over all the column at that row.
let mut values_hashes = vec![0u64; row_len];
create_hashes(array.columns(), random_state, &mut values_hashes)?;

for i in valid_row_indices {
let hash = &mut hashes_buffer[i];
*hash = combine_hashes(*hash, values_hashes[i]);
// Separate paths to avoid allocating Vec when there are no nulls
if let Some(nulls) = nulls {
for i in nulls.valid_indices() {
let hash = &mut hashes_buffer[i];
*hash = combine_hashes(*hash, values_hashes[i]);
}
} else {
for i in 0..row_len {
let hash = &mut hashes_buffer[i];
*hash = combine_hashes(*hash, values_hashes[i]);
}
}

Ok(())
Expand Down Expand Up @@ -663,12 +723,17 @@ fn hash_fixed_list_array(
Ok(())
}

/// Inner hash function for RunArray
#[inline(never)]
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_run_array<R: RunEndIndexType>(
fn hash_run_array_inner<
R: RunEndIndexType,
const HAS_NULL_VALUES: bool,
const REHASH: bool,
>(
array: &RunArray<R>,
random_state: &RandomState,
hashes_buffer: &mut [u64],
rehash: bool,
) -> Result<()> {
// We find the relevant runs that cover potentially sliced arrays, so we can only hash those
// values. Then we find the runs that refer to the original runs and ensure that we apply
Expand Down Expand Up @@ -706,25 +771,23 @@ fn hash_run_array<R: RunEndIndexType>(
.iter()
.enumerate()
{
let is_null_value = sliced_values.is_null(adjusted_physical_index);
let absolute_run_end = absolute_run_end.as_usize();

let end_in_slice = (absolute_run_end - array_offset).min(array_len);

if rehash {
if !is_null_value {
let value_hash = values_hashes[adjusted_physical_index];
for hash in hashes_buffer
.iter_mut()
.take(end_in_slice)
.skip(start_in_slice)
{
*hash = combine_hashes(value_hash, *hash);
}
if HAS_NULL_VALUES && sliced_values.is_null(adjusted_physical_index) {
start_in_slice = end_in_slice;
continue;
}

let value_hash = values_hashes[adjusted_physical_index];
let run_slice = &mut hashes_buffer[start_in_slice..end_in_slice];

if REHASH {
for hash in run_slice.iter_mut() {
*hash = combine_hashes(value_hash, *hash);
}
} else {
let value_hash = values_hashes[adjusted_physical_index];
hashes_buffer[start_in_slice..end_in_slice].fill(value_hash);
run_slice.fill(value_hash);
}

start_in_slice = end_in_slice;
Expand All @@ -733,6 +796,31 @@ fn hash_run_array<R: RunEndIndexType>(
Ok(())
}

#[cfg(not(feature = "force_hash_collisions"))]
fn hash_run_array<R: RunEndIndexType>(
array: &RunArray<R>,
random_state: &RandomState,
hashes_buffer: &mut [u64],
rehash: bool,
) -> Result<()> {
let has_null_values = array.values().null_count() != 0;

match (has_null_values, rehash) {
(false, false) => {
hash_run_array_inner::<R, false, false>(array, random_state, hashes_buffer)
}
(false, true) => {
hash_run_array_inner::<R, false, true>(array, random_state, hashes_buffer)
}
(true, false) => {
hash_run_array_inner::<R, true, false>(array, random_state, hashes_buffer)
}
(true, true) => {
hash_run_array_inner::<R, true, true>(array, random_state, hashes_buffer)
}
}
}

/// Internal helper function that hashes a single array and either initializes or combines
/// the hash values in the buffer.
#[cfg(not(feature = "force_hash_collisions"))]
Expand Down