Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,11 @@ config_namespace! {
///
/// Note: This may reduce parallelism, rooting from the I/O level, if the number of distinct
/// partitions is less than the target_partitions.
///
/// Note for partitioned hash join dynamic filtering:
/// preserving file partitions can allow partition-index routing (`i -> i`) instead of
/// CASE-hash routing, but this assumes build/probe partition indices stay aligned for
/// dynamic filter consumers.
Comment on lines +1002 to +1003
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// CASE-hash routing, but this assumes build/probe partition indices stay aligned for
/// dynamic filter consumers.
/// CASE-hash routing, but this assumes build/probe partition indices stay aligned, otherwise the query might have correctness problems.

I would also add a small example to clarify what “aligned” means -- for example, ranges 0–5 on partition 0 on both the build and probe sides, and so on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Example will epxlain things clearer here. Monodraw is a good tool for this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

pub preserve_file_partitions: usize, default = 0

/// Should DataFusion repartition data using the partitions keys to execute window
Expand Down
302 changes: 296 additions & 6 deletions datafusion/core/tests/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{CsvSource, ParquetSource};
use datafusion::datasource::source::DataSourceExec;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::ScalarValue;
use datafusion_common::config::CsvOptions;
use datafusion_common::error::Result;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{NullEquality, ScalarValue};
use datafusion_datasource::file_groups::FileGroup;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_expr::{JoinType, Operator};
Expand All @@ -61,14 +61,18 @@ use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::execution_plan::ExecutionPlan;
use datafusion_physical_plan::expressions::col;
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::joins::utils::JoinOn;
use datafusion_physical_plan::joins::{
DynamicFilterRoutingMode, HashJoinExec, HashJoinExecBuilder, PartitionMode,
utils::JoinOn,
};
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::union::UnionExec;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlanProperties, PlanProperties, Statistics,
displayable,
DisplayAs, DisplayFormatType, ExecutionPlanProperties, Partitioning, PlanProperties,
Statistics, displayable,
};
use insta::Settings;

Expand Down Expand Up @@ -366,6 +370,64 @@ fn hash_join_exec(
.unwrap()
}

fn partitioned_hash_join_exec_with_routing_mode(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
join_on: &JoinOn,
join_type: &JoinType,
routing_mode: DynamicFilterRoutingMode,
) -> Arc<dyn ExecutionPlan> {
Arc::new(
HashJoinExecBuilder::new(left, right, join_on.clone(), *join_type)
.with_partition_mode(PartitionMode::Partitioned)
.with_null_equality(NullEquality::NullEqualsNothing)
.with_dynamic_filter_routing_mode(routing_mode)
.build()
.unwrap(),
)
}

fn first_hash_join_and_direct_hash_repartition_children(
plan: &Arc<dyn ExecutionPlan>,
) -> Option<(&HashJoinExec, usize)> {
if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
let direct_hash_repartition_children = hash_join
.children()
.into_iter()
.filter(|child| {
child
.as_any()
.downcast_ref::<RepartitionExec>()
.is_some_and(|repartition| {
matches!(repartition.partitioning(), Partitioning::Hash(_, _))
})
})
.count();
return Some((hash_join, direct_hash_repartition_children));
}

for child in plan.children() {
if let Some(result) = first_hash_join_and_direct_hash_repartition_children(child)
{
return Some(result);
}
}
None
}

fn hash_repartition_on_column(
input: Arc<dyn ExecutionPlan>,
column_name: &str,
partition_count: usize,
) -> Arc<dyn ExecutionPlan> {
let expr = Arc::new(Column::new_with_schema(column_name, &input.schema()).unwrap())
as Arc<dyn PhysicalExpr>;
Arc::new(
RepartitionExec::try_new(input, Partitioning::Hash(vec![expr], partition_count))
.unwrap(),
)
}

fn filter_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
let predicate = Arc::new(BinaryExpr::new(
col("c", &schema()).unwrap(),
Expand Down Expand Up @@ -405,6 +467,23 @@ fn ensure_distribution_helper(
ensure_distribution(distribution_context, &config).map(|item| item.data.plan)
}

/// Like [`ensure_distribution_helper`] but uses bottom-up `transform_up`.
fn ensure_distribution_helper_transform_up(
plan: Arc<dyn ExecutionPlan>,
target_partitions: usize,
) -> Result<Arc<dyn ExecutionPlan>> {
let distribution_context = DistributionContext::new_default(plan);
let mut config = ConfigOptions::new();
config.execution.target_partitions = target_partitions;
config.optimizer.enable_round_robin_repartition = false;
config.optimizer.repartition_file_scans = false;
config.optimizer.repartition_file_min_size = 1024;
config.optimizer.prefer_existing_sort = false;
distribution_context
.transform_up(|node| ensure_distribution(node, &config))
.map(|item| item.data.plan)
}

fn test_suite_default_config_options() -> ConfigOptions {
let mut config = ConfigOptions::new();

Expand Down Expand Up @@ -737,6 +816,210 @@ fn multi_hash_joins() -> Result<()> {
Ok(())
}

#[test]
fn enforce_distribution_switches_to_partition_index_without_hash_repartition()
-> Result<()> {
let left = parquet_exec();
let right = parquet_exec();

let join_on = vec![(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
as Arc<dyn PhysicalExpr>,
Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
as Arc<dyn PhysicalExpr>,
)];

// Start with the wrong mode and verify EnforceDistribution corrects it when hash repartition
// is not inserted.
let join = partitioned_hash_join_exec_with_routing_mode(
left,
right,
&join_on,
&JoinType::Inner,
DynamicFilterRoutingMode::CaseHash,
);

let optimized = ensure_distribution_helper_transform_up(join, 1)?;
assert_plan!(optimized, @r"
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
");
let (hash_join, direct_hash_repartition_children) =
first_hash_join_and_direct_hash_repartition_children(&optimized)
.expect("expected HashJoinExec");

assert_eq!(
hash_join.dynamic_filter_routing_mode,
DynamicFilterRoutingMode::PartitionIndex,
);
assert_eq!(direct_hash_repartition_children, 0);

Ok(())
}

#[test]
fn enforce_distribution_uses_case_hash_if_any_child_has_hash_repartition() -> Result<()> {
let left = parquet_exec_multiple();
let right = parquet_exec();

let join_on = vec![(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
as Arc<dyn PhysicalExpr>,
Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
as Arc<dyn PhysicalExpr>,
)];

// One side starts with multiple partitions while target is 1. EnforceDistribution inserts a
// hash repartition on one child, and routing mode should be CASE routing.
let join = partitioned_hash_join_exec_with_routing_mode(
left,
right,
&join_on,
&JoinType::Inner,
DynamicFilterRoutingMode::PartitionIndex,
);

let optimized = ensure_distribution_helper_transform_up(join, 1)?;
assert_plan!(optimized, @r"
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)]
RepartitionExec: partitioning=Hash([a@0], 1), input_partitions=2
DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], file_type=parquet
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
");
let (hash_join, direct_hash_repartition_children) =
first_hash_join_and_direct_hash_repartition_children(&optimized)
.expect("expected HashJoinExec");

assert_eq!(
hash_join.dynamic_filter_routing_mode,
DynamicFilterRoutingMode::CaseHash,
);
assert_eq!(direct_hash_repartition_children, 1);

Ok(())
}

#[test]
fn enforce_distribution_uses_case_hash_with_hidden_hash_repartition_through_aggregate()
-> Result<()> {
let left = projection_exec_with_alias(
hash_repartition_on_column(parquet_exec(), "a", 4),
vec![("a".to_string(), "a".to_string())],
);

let right = aggregate_exec_with_alias(
hash_repartition_on_column(parquet_exec(), "a", 4),
vec![("a".to_string(), "a".to_string())],
);

let join_on = vec![(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
as Arc<dyn PhysicalExpr>,
Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
as Arc<dyn PhysicalExpr>,
)];

// Both sides are already hash repartitioned, but the hash repartitions are below other
// operators not directly under the join. EnforceDistribution should choose CASE routing.
let join = partitioned_hash_join_exec_with_routing_mode(
left,
right,
&join_on,
&JoinType::Inner,
DynamicFilterRoutingMode::PartitionIndex,
);

let optimized = ensure_distribution_helper_transform_up(join, 4)?;
assert_plan!(optimized, @r"
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)]
RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1
ProjectionExec: expr=[a@0 as a]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]
RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1
AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
");
let (hash_join, direct_hash_repartition_children) =
first_hash_join_and_direct_hash_repartition_children(&optimized)
.expect("expected HashJoinExec");

assert_eq!(
hash_join.dynamic_filter_routing_mode,
DynamicFilterRoutingMode::CaseHash,
);
assert_eq!(direct_hash_repartition_children, 1);

Ok(())
}

#[test]
fn enforce_distribution_ignores_hash_repartition_off_dynamic_filter_path() -> Result<()> {
// This hash repartition is in the probe subtree but off the dynamic filter pushdown path
// because the top filter references `a` while this branch only exposes `a2`.
let lower_left = projection_exec_with_alias(
hash_repartition_on_column(parquet_exec(), "a", 4),
vec![("a".to_string(), "a2".to_string())],
);
let lower_right: Arc<dyn ExecutionPlan> = parquet_exec();

let lower_join_on = vec![(
Arc::new(Column::new_with_schema("a2", &lower_left.schema()).unwrap())
as Arc<dyn PhysicalExpr>,
Arc::new(Column::new_with_schema("a", &lower_right.schema()).unwrap())
as Arc<dyn PhysicalExpr>,
)];

let lower_join: Arc<dyn ExecutionPlan> = Arc::new(
HashJoinExecBuilder::new(
lower_left.clone(),
lower_right.clone(),
lower_join_on,
JoinType::Inner,
)
.with_partition_mode(PartitionMode::CollectLeft)
.with_null_equality(NullEquality::NullEqualsNothing)
.build()
.unwrap(),
);

let left = parquet_exec();
let join_on = vec![(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
as Arc<dyn PhysicalExpr>,
Arc::new(Column::new_with_schema("a", &lower_join.schema()).unwrap())
as Arc<dyn PhysicalExpr>,
)];

let join = partitioned_hash_join_exec_with_routing_mode(
left,
lower_join,
&join_on,
&JoinType::Inner,
DynamicFilterRoutingMode::CaseHash,
);

let optimized = ensure_distribution_helper_transform_up(join, 1)?;
assert_plan!(optimized, @r"
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@1)]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a2@0, a@0)]
ProjectionExec: expr=[a@0 as a2]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
");
let (hash_join, _) = first_hash_join_and_direct_hash_repartition_children(&optimized)
.expect("expected HashJoinExec");

assert_eq!(
hash_join.dynamic_filter_routing_mode,
DynamicFilterRoutingMode::PartitionIndex
);

Ok(())
}

#[test]
fn multi_joins_after_alias() -> Result<()> {
let left = parquet_exec();
Expand Down Expand Up @@ -3647,8 +3930,15 @@ fn test_replace_order_preserving_variants_with_fetch() -> Result<()> {
// Create distribution context
let dist_context = DistributionContext::new(
spm_exec,
true,
vec![DistributionContext::new(parquet_exec, false, vec![])],
DistFlags {
dist_changing: true,
..Default::default()
},
vec![DistributionContext::new(
parquet_exec,
DistFlags::default(),
vec![],
)],
);

// Apply the function
Expand Down
Loading