diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 0ba587bbc6961..77210cbc3bb95 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -996,6 +996,11 @@ config_namespace! { /// /// Note: This may reduce parallelism, rooting from the I/O level, if the number of distinct /// partitions is less than the target_partitions. + /// + /// Note for partitioned hash join dynamic filtering: + /// preserving file partitions can allow partition-index routing (`i -> i`) instead of + /// CASE-hash routing, but this assumes build/probe partition indices stay aligned for + /// dynamic filter consumers. pub preserve_file_partitions: usize, default = 0 /// Should DataFusion repartition data using the partitions keys to execute window diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 94ae82a9ad755..350cba410059b 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -37,10 +37,10 @@ use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{CsvSource, ParquetSource}; use datafusion::datasource::source::DataSourceExec; use datafusion::prelude::{SessionConfig, SessionContext}; -use datafusion_common::ScalarValue; use datafusion_common::config::CsvOptions; use datafusion_common::error::Result; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::{NullEquality, ScalarValue}; use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_expr::{JoinType, Operator}; @@ -61,14 +61,18 @@ use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::execution_plan::ExecutionPlan; use datafusion_physical_plan::expressions::col; use datafusion_physical_plan::filter::FilterExec; -use datafusion_physical_plan::joins::utils::JoinOn; +use datafusion_physical_plan::joins::{ + DynamicFilterRoutingMode, HashJoinExec, HashJoinExecBuilder, PartitionMode, + utils::JoinOn, +}; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; +use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlanProperties, PlanProperties, Statistics, - displayable, + DisplayAs, DisplayFormatType, ExecutionPlanProperties, Partitioning, PlanProperties, + Statistics, displayable, }; use insta::Settings; @@ -366,6 +370,64 @@ fn hash_join_exec( .unwrap() } +fn partitioned_hash_join_exec_with_routing_mode( + left: Arc, + right: Arc, + join_on: &JoinOn, + join_type: &JoinType, + routing_mode: DynamicFilterRoutingMode, +) -> Arc { + Arc::new( + HashJoinExecBuilder::new(left, right, join_on.clone(), *join_type) + .with_partition_mode(PartitionMode::Partitioned) + .with_null_equality(NullEquality::NullEqualsNothing) + .with_dynamic_filter_routing_mode(routing_mode) + .build() + .unwrap(), + ) +} + +fn first_hash_join_and_direct_hash_repartition_children( + plan: &Arc, +) -> Option<(&HashJoinExec, usize)> { + if let Some(hash_join) = plan.as_any().downcast_ref::() { + let direct_hash_repartition_children = hash_join + .children() + .into_iter() + .filter(|child| { + child + .as_any() + .downcast_ref::() + .is_some_and(|repartition| { + matches!(repartition.partitioning(), Partitioning::Hash(_, _)) + }) + }) + .count(); + return Some((hash_join, direct_hash_repartition_children)); + } + + for child in plan.children() { + if let Some(result) = first_hash_join_and_direct_hash_repartition_children(child) + { + return Some(result); + } + } + None +} + +fn hash_repartition_on_column( + input: Arc, + column_name: &str, + partition_count: usize, +) -> Arc { + let expr = Arc::new(Column::new_with_schema(column_name, &input.schema()).unwrap()) + as Arc; + Arc::new( + RepartitionExec::try_new(input, Partitioning::Hash(vec![expr], partition_count)) + .unwrap(), + ) +} + fn filter_exec(input: Arc) -> Arc { let predicate = Arc::new(BinaryExpr::new( col("c", &schema()).unwrap(), @@ -405,6 +467,23 @@ fn ensure_distribution_helper( ensure_distribution(distribution_context, &config).map(|item| item.data.plan) } +/// Like [`ensure_distribution_helper`] but uses bottom-up `transform_up`. +fn ensure_distribution_helper_transform_up( + plan: Arc, + target_partitions: usize, +) -> Result> { + let distribution_context = DistributionContext::new_default(plan); + let mut config = ConfigOptions::new(); + config.execution.target_partitions = target_partitions; + config.optimizer.enable_round_robin_repartition = false; + config.optimizer.repartition_file_scans = false; + config.optimizer.repartition_file_min_size = 1024; + config.optimizer.prefer_existing_sort = false; + distribution_context + .transform_up(|node| ensure_distribution(node, &config)) + .map(|item| item.data.plan) +} + fn test_suite_default_config_options() -> ConfigOptions { let mut config = ConfigOptions::new(); @@ -737,6 +816,210 @@ fn multi_hash_joins() -> Result<()> { Ok(()) } +#[test] +fn enforce_distribution_switches_to_partition_index_without_hash_repartition() +-> Result<()> { + let left = parquet_exec(); + let right = parquet_exec(); + + let join_on = vec![( + Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) + as Arc, + Arc::new(Column::new_with_schema("a", &right.schema()).unwrap()) + as Arc, + )]; + + // Start with the wrong mode and verify EnforceDistribution corrects it when hash repartition + // is not inserted. + let join = partitioned_hash_join_exec_with_routing_mode( + left, + right, + &join_on, + &JoinType::Inner, + DynamicFilterRoutingMode::CaseHash, + ); + + let optimized = ensure_distribution_helper_transform_up(join, 1)?; + assert_plan!(optimized, @r" + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "); + let (hash_join, direct_hash_repartition_children) = + first_hash_join_and_direct_hash_repartition_children(&optimized) + .expect("expected HashJoinExec"); + + assert_eq!( + hash_join.dynamic_filter_routing_mode, + DynamicFilterRoutingMode::PartitionIndex, + ); + assert_eq!(direct_hash_repartition_children, 0); + + Ok(()) +} + +#[test] +fn enforce_distribution_uses_case_hash_if_any_child_has_hash_repartition() -> Result<()> { + let left = parquet_exec_multiple(); + let right = parquet_exec(); + + let join_on = vec![( + Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) + as Arc, + Arc::new(Column::new_with_schema("a", &right.schema()).unwrap()) + as Arc, + )]; + + // One side starts with multiple partitions while target is 1. EnforceDistribution inserts a + // hash repartition on one child, and routing mode should be CASE routing. + let join = partitioned_hash_join_exec_with_routing_mode( + left, + right, + &join_on, + &JoinType::Inner, + DynamicFilterRoutingMode::PartitionIndex, + ); + + let optimized = ensure_distribution_helper_transform_up(join, 1)?; + assert_plan!(optimized, @r" + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)] + RepartitionExec: partitioning=Hash([a@0], 1), input_partitions=2 + DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], file_type=parquet + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "); + let (hash_join, direct_hash_repartition_children) = + first_hash_join_and_direct_hash_repartition_children(&optimized) + .expect("expected HashJoinExec"); + + assert_eq!( + hash_join.dynamic_filter_routing_mode, + DynamicFilterRoutingMode::CaseHash, + ); + assert_eq!(direct_hash_repartition_children, 1); + + Ok(()) +} + +#[test] +fn enforce_distribution_uses_case_hash_with_hidden_hash_repartition_through_aggregate() +-> Result<()> { + let left = projection_exec_with_alias( + hash_repartition_on_column(parquet_exec(), "a", 4), + vec![("a".to_string(), "a".to_string())], + ); + + let right = aggregate_exec_with_alias( + hash_repartition_on_column(parquet_exec(), "a", 4), + vec![("a".to_string(), "a".to_string())], + ); + + let join_on = vec![( + Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) + as Arc, + Arc::new(Column::new_with_schema("a", &right.schema()).unwrap()) + as Arc, + )]; + + // Both sides are already hash repartitioned, but the hash repartitions are below other + // operators not directly under the join. EnforceDistribution should choose CASE routing. + let join = partitioned_hash_join_exec_with_routing_mode( + left, + right, + &join_on, + &JoinType::Inner, + DynamicFilterRoutingMode::PartitionIndex, + ); + + let optimized = ensure_distribution_helper_transform_up(join, 4)?; + assert_plan!(optimized, @r" + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)] + RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 + ProjectionExec: expr=[a@0 as a] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[] + RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 + AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "); + let (hash_join, direct_hash_repartition_children) = + first_hash_join_and_direct_hash_repartition_children(&optimized) + .expect("expected HashJoinExec"); + + assert_eq!( + hash_join.dynamic_filter_routing_mode, + DynamicFilterRoutingMode::CaseHash, + ); + assert_eq!(direct_hash_repartition_children, 1); + + Ok(()) +} + +#[test] +fn enforce_distribution_ignores_hash_repartition_off_dynamic_filter_path() -> Result<()> { + // This hash repartition is in the probe subtree but off the dynamic filter pushdown path + // because the top filter references `a` while this branch only exposes `a2`. + let lower_left = projection_exec_with_alias( + hash_repartition_on_column(parquet_exec(), "a", 4), + vec![("a".to_string(), "a2".to_string())], + ); + let lower_right: Arc = parquet_exec(); + + let lower_join_on = vec![( + Arc::new(Column::new_with_schema("a2", &lower_left.schema()).unwrap()) + as Arc, + Arc::new(Column::new_with_schema("a", &lower_right.schema()).unwrap()) + as Arc, + )]; + + let lower_join: Arc = Arc::new( + HashJoinExecBuilder::new( + lower_left.clone(), + lower_right.clone(), + lower_join_on, + JoinType::Inner, + ) + .with_partition_mode(PartitionMode::CollectLeft) + .with_null_equality(NullEquality::NullEqualsNothing) + .build() + .unwrap(), + ); + + let left = parquet_exec(); + let join_on = vec![( + Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) + as Arc, + Arc::new(Column::new_with_schema("a", &lower_join.schema()).unwrap()) + as Arc, + )]; + + let join = partitioned_hash_join_exec_with_routing_mode( + left, + lower_join, + &join_on, + &JoinType::Inner, + DynamicFilterRoutingMode::CaseHash, + ); + + let optimized = ensure_distribution_helper_transform_up(join, 1)?; + assert_plan!(optimized, @r" + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@1)] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a2@0, a@0)] + ProjectionExec: expr=[a@0 as a2] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "); + let (hash_join, _) = first_hash_join_and_direct_hash_repartition_children(&optimized) + .expect("expected HashJoinExec"); + + assert_eq!( + hash_join.dynamic_filter_routing_mode, + DynamicFilterRoutingMode::PartitionIndex + ); + + Ok(()) +} + #[test] fn multi_joins_after_alias() -> Result<()> { let left = parquet_exec(); @@ -3647,8 +3930,15 @@ fn test_replace_order_preserving_variants_with_fetch() -> Result<()> { // Create distribution context let dist_context = DistributionContext::new( spm_exec, - true, - vec![DistributionContext::new(parquet_exec, false, vec![])], + DistFlags { + dist_changing: true, + ..Default::default() + }, + vec![DistributionContext::new( + parquet_exec, + DistFlags::default(), + vec![], + )], ); // Apply the function diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index b3ed8d9653fe1..8abd82f5166ea 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -34,6 +34,7 @@ use datafusion::{ scalar::ScalarValue, }; use datafusion_catalog::memory::DataSourceExec; +use datafusion_common::JoinType; use datafusion_common::config::ConfigOptions; use datafusion_datasource::{ PartitionedFile, file_groups::FileGroup, file_scan_config::FileScanConfigBuilder, @@ -59,15 +60,16 @@ use datafusion_physical_plan::{ coalesce_partitions::CoalescePartitionsExec, collect, filter::{FilterExec, FilterExecBuilder}, + joins::{DynamicFilterRoutingMode, HashJoinExec, HashJoinExecBuilder, PartitionMode}, projection::ProjectionExec, repartition::RepartitionExec, sorts::sort::SortExec, + union::UnionExec, }; use super::pushdown_utils::{ OptimizationTest, TestNode, TestScanBuilder, TestSource, format_plan_for_test, }; -use datafusion_physical_plan::union::UnionExec; use futures::StreamExt; use object_store::{ObjectStore, memory::InMemory}; use regex::Regex; @@ -177,7 +179,7 @@ fn test_pushdown_into_scan_with_config_options() { #[tokio::test] async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() { use datafusion_common::JoinType; - use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + use datafusion_physical_plan::joins::PartitionMode; // Create build side with limited values let build_batches = vec![ @@ -223,18 +225,12 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() { col("d", &probe_side_schema).unwrap(), )]; let join = Arc::new( - HashJoinExec::try_new( - build_scan, - probe_scan, - on, - None, - &JoinType::Inner, - None, - PartitionMode::Partitioned, - datafusion_common::NullEquality::NullEqualsNothing, - false, - ) - .unwrap(), + HashJoinExecBuilder::new(build_scan, probe_scan, on, JoinType::Inner) + .with_partition_mode(PartitionMode::Partitioned) + .with_null_equality(datafusion_common::NullEquality::NullEqualsNothing) + .with_dynamic_filter_routing_mode(DynamicFilterRoutingMode::PartitionIndex) + .build() + .unwrap(), ); let join_schema = join.schema(); @@ -279,15 +275,15 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() { // Iterate one batch stream.next().await.unwrap().unwrap(); - // Test that filters are pushed down correctly to each side of the join - // NOTE: We dropped the CASE expression here because we now optimize that away if there's only 1 partition + // Test that filters are pushed down correctly to each side of the join. + // This test has no RepartitionExec, so partition-index routing is used. insta::assert_snapshot!( format_plan_for_test(&plan), @r" - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb] - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab AND d@0 IN (SET) ([aa, ab]) ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ {0: d@0 >= aa AND d@0 <= ab AND d@0 IN (SET) ([aa, ab])} ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ] " ); } @@ -872,7 +868,7 @@ async fn test_topk_filter_passes_through_coalesce_partitions() { ]; // Create a source that supports all batches - let source = Arc::new(TestSource::new(schema(), true, batches)); + let source = Arc::new(TestSource::new(schema(), true, batches, None)); let base_config = FileScanConfigBuilder::new(ObjectStoreUrl::parse("test://").unwrap(), source) @@ -921,6 +917,38 @@ async fn test_topk_filter_passes_through_coalesce_partitions() { ); } +/// Returns a `SessionConfig` with `pushdown_filters`, `enable_dynamic_filter_pushdown`, and `batch_size=10`. +fn dynamic_filter_session_config() -> SessionConfig { + let mut config = SessionConfig::new().with_batch_size(10); + config.options_mut().execution.parquet.pushdown_filters = true; + config + .options_mut() + .optimizer + .enable_dynamic_filter_pushdown = true; + config +} + +/// Optimizes the plan with `FilterPushdown`, creates a session context, and collects results. +async fn optimize_and_collect( + plan: Arc, + session_config: SessionConfig, +) -> (Arc, Vec) { + let plan = FilterPushdown::new_post_optimization() + .optimize(plan, session_config.options()) + .unwrap(); + let session_ctx = SessionContext::new_with_config(session_config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx)) + .await + .unwrap(); + (plan, batches) +} + #[tokio::test] async fn test_hashjoin_dynamic_filter_pushdown() { use datafusion_common::JoinType; @@ -1008,19 +1036,16 @@ async fn test_hashjoin_dynamic_filter_pushdown() { ); // Actually apply the optimization to the plan and execute to see the filter in action - let mut config = ConfigOptions::default(); - config.execution.parquet.pushdown_filters = true; - config.optimizer.enable_dynamic_filter_pushdown = true; + let session_config = dynamic_filter_session_config(); let plan = FilterPushdown::new_post_optimization() - .optimize(plan, &config) + .optimize(plan, session_config.options()) .unwrap(); // Test for https://github.com/apache/datafusion/pull/17371: dynamic filter linking survives `with_new_children` let children = plan.children().into_iter().map(Arc::clone).collect(); let plan = plan.with_new_children(children).unwrap(); - let config = SessionConfig::new().with_batch_size(10); - let session_ctx = SessionContext::new_with_config(config); + let session_ctx = SessionContext::new_with_config(session_config); session_ctx.register_object_store( ObjectStoreUrl::parse("test://").unwrap().as_ref(), Arc::new(InMemory::new()), @@ -1218,23 +1243,8 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { ); // Actually apply the optimization to the plan and execute to see the filter in action - let mut config = ConfigOptions::default(); - config.execution.parquet.pushdown_filters = true; - config.optimizer.enable_dynamic_filter_pushdown = true; - let plan = FilterPushdown::new_post_optimization() - .optimize(plan, &config) - .unwrap(); - let config = SessionConfig::new().with_batch_size(10); - let session_ctx = SessionContext::new_with_config(config); - session_ctx.register_object_store( - ObjectStoreUrl::parse("test://").unwrap().as_ref(), - Arc::new(InMemory::new()), - ); - let state = session_ctx.state(); - let task_ctx = state.task_ctx(); - let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx)) - .await - .unwrap(); + let session_config = dynamic_filter_session_config(); + let (plan, batches) = optimize_and_collect(plan, session_config).await; // Now check what our filter looks like #[cfg(not(feature = "force_hash_collisions"))] @@ -1291,6 +1301,194 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { ); } +#[tokio::test] +async fn test_partitioned_hashjoin_no_repartition_dynamic_filter_pushdown() { + use datafusion_common::JoinType; + + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("e", DataType::Float64, false), + ])); + + // Build side: different data per partition so bounds differ. + // Partition 0: ("aa", "ba") -> bounds a:[aa,aa], b:[ba,ba] + // Partition 1: ("zz", "zz") -> bounds a:[zz,zz], b:[zz,zz] + let build_p0 = vec![ + record_batch!( + ("a", Utf8, ["aa"]), + ("b", Utf8, ["ba"]), + ("c", Float64, [1.0]) + ) + .unwrap(), + ]; + let build_p1 = vec![ + record_batch!( + ("a", Utf8, ["zz"]), + ("b", Utf8, ["zz"]), + ("c", Float64, [2.0]) + ) + .unwrap(), + ]; + + // Probe side: each partition has a mix of matching and non-matching rows. + // Partition 0: ("aa","ba") matches p0 bounds, ("zz","zz") does not + // Partition 1: ("zz","zz") matches p1 bounds, ("aa","ba") does not + let probe_p0 = vec![ + record_batch!( + ("a", Utf8, ["aa", "zz"]), + ("b", Utf8, ["ba", "zz"]), + ("e", Float64, [10.0, 20.0]) + ) + .unwrap(), + ]; + let probe_p1 = vec![ + record_batch!( + ("a", Utf8, ["zz", "aa"]), + ("b", Utf8, ["zz", "ba"]), + ("e", Float64, [30.0, 40.0]) + ) + .unwrap(), + ]; + + // Use 2 file groups per side (2 partitions) with no RepartitionExec. + // This triggers partition-index routing and simulates what happens when + // `preserve_file_partitions` is enabled and declares Hash partitioning on the DataSourceExec. + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches_for_partition(build_p0) + .with_batches_for_partition(build_p1) + .with_file_group(FileGroup::new(vec![PartitionedFile::new( + "build_0.parquet", + 123, + )])) + .with_file_group(FileGroup::new(vec![PartitionedFile::new( + "build_1.parquet", + 123, + )])) + .build(); + + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .with_batches_for_partition(probe_p0) + .with_batches_for_partition(probe_p1) + .with_file_group(FileGroup::new(vec![PartitionedFile::new( + "probe_0.parquet", + 123, + )])) + .with_file_group(FileGroup::new(vec![PartitionedFile::new( + "probe_1.parquet", + 123, + )])) + .build(); + + let on = vec![ + ( + col("a", &build_side_schema).unwrap(), + col("a", &probe_side_schema).unwrap(), + ), + ( + col("b", &build_side_schema).unwrap(), + col("b", &probe_side_schema).unwrap(), + ), + ]; + let hash_join = Arc::new( + HashJoinExecBuilder::new( + build_scan, + Arc::clone(&probe_scan), + on, + JoinType::Inner, + ) + .with_partition_mode(PartitionMode::Partitioned) + .with_null_equality(datafusion_common::NullEquality::NullEqualsNothing) + .with_dynamic_filter_routing_mode(DynamicFilterRoutingMode::PartitionIndex) + .build() + .unwrap(), + ); + + // Top-level CoalescePartitionsExec + let cp = Arc::new(CoalescePartitionsExec::new(hash_join)) as Arc; + // Add a sort for deterministic output + let plan = Arc::new(SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr::new( + col("a", &probe_side_schema).unwrap(), + SortOptions::new(true, false), // descending, nulls_first + )]) + .unwrap(), + cp, + )) as Arc; + + // expect the predicate to be pushed down into the probe side DataSource + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true), + @r" + OptimizationTest: + input: + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - CoalescePartitionsExec + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={2 groups: [[build_0.parquet], [build_1.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={2 groups: [[probe_0.parquet], [probe_1.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true + output: + Ok: + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - CoalescePartitionsExec + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={2 groups: [[build_0.parquet], [build_1.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={2 groups: [[probe_0.parquet], [probe_1.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] + " + ); + + let mut session_config = dynamic_filter_session_config(); + // Enable preserve_file_partitions to trigger partition-index routing + session_config + .options_mut() + .optimizer + .preserve_file_partitions = 1; + let (plan, batches) = optimize_and_collect(plan, session_config).await; + + // With partition-index routing, per-partition filters have different bounds: + // Partition 0: a:[aa,aa], b:[ba,ba] with membership {aa,ba} + // Partition 1: a:[zz,zz], b:[zz,zz] with membership {zz,zz} + insta::assert_snapshot!( + format!("{}", format_plan_for_test(&plan)), + @r" + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - CoalescePartitionsExec + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={2 groups: [[build_0.parquet], [build_1.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={2 groups: [[probe_0.parquet], [probe_1.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ {0: a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]), 1: a@0 >= zz AND a@0 <= zz AND b@1 >= zz AND b@1 <= zz AND struct(a@0, b@1) IN (SET) ([{c0:zz,c1:zz}])} ] + " + ); + + let result = format!("{}", pretty_format_batches(&batches).unwrap()); + + let probe_scan_metrics = probe_scan.metrics().unwrap(); + + // Per-partition filtering prunes non-matching rows: + // Probe p0: ("aa","ba") passes, ("zz","zz") filtered -> 1 row + // Probe p1: ("zz","zz") passes, ("aa","ba") filtered -> 1 row + assert_eq!(probe_scan_metrics.output_rows().unwrap(), 2); + + insta::assert_snapshot!( + result, + @r" + +----+----+-----+----+----+------+ + | a | b | c | a | b | e | + +----+----+-----+----+----+------+ + | zz | zz | 2.0 | zz | zz | 30.0 | + | aa | ba | 1.0 | aa | ba | 10.0 | + +----+----+-----+----+----+------+ + " + ); +} + #[tokio::test] async fn test_hashjoin_dynamic_filter_pushdown_collect_left() { use datafusion_common::JoinType; @@ -1410,23 +1608,8 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() { ); // Actually apply the optimization to the plan and execute to see the filter in action - let mut config = ConfigOptions::default(); - config.execution.parquet.pushdown_filters = true; - config.optimizer.enable_dynamic_filter_pushdown = true; - let plan = FilterPushdown::new_post_optimization() - .optimize(plan, &config) - .unwrap(); - let config = SessionConfig::new().with_batch_size(10); - let session_ctx = SessionContext::new_with_config(config); - session_ctx.register_object_store( - ObjectStoreUrl::parse("test://").unwrap().as_ref(), - Arc::new(InMemory::new()), - ); - let state = session_ctx.state(); - let task_ctx = state.task_ctx(); - let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx)) - .await - .unwrap(); + let session_config = dynamic_filter_session_config(); + let (plan, batches) = optimize_and_collect(plan, session_config).await; // Now check what our filter looks like insta::assert_snapshot!( @@ -1466,7 +1649,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() { #[tokio::test] async fn test_nested_hashjoin_dynamic_filter_pushdown() { use datafusion_common::JoinType; - use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + use datafusion_physical_plan::joins::PartitionMode; // Create test data for three tables: t1, t2, t3 // t1: small table with limited values (will be build side of outer join) @@ -1529,18 +1712,12 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() { let inner_join_on = vec![(col("c", &t2_schema).unwrap(), col("d", &t3_schema).unwrap())]; let inner_join = Arc::new( - HashJoinExec::try_new( - t2_scan, - t3_scan, - inner_join_on, - None, - &JoinType::Inner, - None, - PartitionMode::Partitioned, - datafusion_common::NullEquality::NullEqualsNothing, - false, - ) - .unwrap(), + HashJoinExecBuilder::new(t2_scan, t3_scan, inner_join_on, JoinType::Inner) + .with_partition_mode(PartitionMode::Partitioned) + .with_null_equality(datafusion_common::NullEquality::NullEqualsNothing) + .with_dynamic_filter_routing_mode(DynamicFilterRoutingMode::PartitionIndex) + .build() + .unwrap(), ); // Then create outer join: t1.a = t2.b (from inner join result) @@ -1549,17 +1726,16 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() { col("b", &inner_join.schema()).unwrap(), )]; let outer_join = Arc::new( - HashJoinExec::try_new( + HashJoinExecBuilder::new( t1_scan, inner_join as Arc, outer_join_on, - None, - &JoinType::Inner, - None, - PartitionMode::Partitioned, - datafusion_common::NullEquality::NullEqualsNothing, - false, + JoinType::Inner, ) + .with_partition_mode(PartitionMode::Partitioned) + .with_null_equality(datafusion_common::NullEquality::NullEqualsNothing) + .with_dynamic_filter_routing_mode(DynamicFilterRoutingMode::PartitionIndex) + .build() .unwrap(), ) as Arc; @@ -1585,14 +1761,11 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() { ); // Execute the plan to verify the dynamic filters are properly updated - let mut config = ConfigOptions::default(); - config.execution.parquet.pushdown_filters = true; - config.optimizer.enable_dynamic_filter_pushdown = true; + let session_config = dynamic_filter_session_config(); let plan = FilterPushdown::new_post_optimization() - .optimize(outer_join, &config) + .optimize(outer_join, session_config.options()) .unwrap(); - let config = SessionConfig::new().with_batch_size(10); - let session_ctx = SessionContext::new_with_config(config); + let session_ctx = SessionContext::new_with_config(session_config); session_ctx.register_object_store( ObjectStoreUrl::parse("test://").unwrap().as_ref(), Arc::new(InMemory::new()), @@ -1603,24 +1776,21 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() { // Execute to populate the dynamic filters stream.next().await.unwrap().unwrap(); - // Verify that both the inner and outer join have updated dynamic filters + // No RepartitionExec on either join, so partition-index routing is used. insta::assert_snapshot!( format!("{}", format_plan_for_test(&plan)), @r" - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab AND b@0 IN (SET) ([aa, ab]) ] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb AND d@0 IN (SET) ([ca, cb]) ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ {0: b@0 >= aa AND b@0 <= ab AND b@0 IN (SET) ([aa, ab])} ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ {0: d@0 >= ca AND d@0 <= cb AND d@0 IN (SET) ([ca, cb])} ] " ); } #[tokio::test] async fn test_hashjoin_parent_filter_pushdown() { - use datafusion_common::JoinType; - use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; - // Create build side with limited values let build_batches = vec![ record_batch!( @@ -3370,24 +3540,12 @@ async fn test_hashjoin_hash_table_pushdown_partitioned() { )) as Arc; // Apply the optimization with config setting that forces HashTable strategy - let session_config = SessionConfig::default() - .with_batch_size(10) - .set_usize("datafusion.optimizer.hash_join_inlist_pushdown_max_size", 1) - .set_bool("datafusion.execution.parquet.pushdown_filters", true) - .set_bool("datafusion.optimizer.enable_dynamic_filter_pushdown", true); - let plan = FilterPushdown::new_post_optimization() - .optimize(plan, session_config.options()) - .unwrap(); - let session_ctx = SessionContext::new_with_config(session_config); - session_ctx.register_object_store( - ObjectStoreUrl::parse("test://").unwrap().as_ref(), - Arc::new(InMemory::new()), - ); - let state = session_ctx.state(); - let task_ctx = state.task_ctx(); - let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx)) - .await - .unwrap(); + let mut session_config = dynamic_filter_session_config(); + session_config + .options_mut() + .optimizer + .hash_join_inlist_pushdown_max_size = 1; + let (plan, batches) = optimize_and_collect(plan, session_config).await; // Verify that hash_lookup is used instead of IN (SET) let plan_str = format_plan_for_test(&plan).to_string(); @@ -3521,24 +3679,12 @@ async fn test_hashjoin_hash_table_pushdown_collect_left() { )) as Arc; // Apply the optimization with config setting that forces HashTable strategy - let session_config = SessionConfig::default() - .with_batch_size(10) - .set_usize("datafusion.optimizer.hash_join_inlist_pushdown_max_size", 1) - .set_bool("datafusion.execution.parquet.pushdown_filters", true) - .set_bool("datafusion.optimizer.enable_dynamic_filter_pushdown", true); - let plan = FilterPushdown::new_post_optimization() - .optimize(plan, session_config.options()) - .unwrap(); - let session_ctx = SessionContext::new_with_config(session_config); - session_ctx.register_object_store( - ObjectStoreUrl::parse("test://").unwrap().as_ref(), - Arc::new(InMemory::new()), - ); - let state = session_ctx.state(); - let task_ctx = state.task_ctx(); - let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx)) - .await - .unwrap(); + let mut session_config = dynamic_filter_session_config(); + session_config + .options_mut() + .optimizer + .hash_join_inlist_pushdown_max_size = 1; + let (plan, batches) = optimize_and_collect(plan, session_config).await; // Verify that hash_lookup is used instead of IN (SET) let plan_str = format_plan_for_test(&plan).to_string(); @@ -3753,11 +3899,9 @@ async fn test_hashjoin_dynamic_filter_pushdown_is_used() { ) as Arc; // Apply filter pushdown optimization - let mut config = ConfigOptions::default(); - config.execution.parquet.pushdown_filters = true; - config.optimizer.enable_dynamic_filter_pushdown = true; + let session_config = dynamic_filter_session_config(); let plan = FilterPushdown::new_post_optimization() - .optimize(plan, &config) + .optimize(plan, session_config.options()) .unwrap(); // Get the HashJoinExec to check the dynamic filter diff --git a/datafusion/core/tests/physical_optimizer/pushdown_utils.rs b/datafusion/core/tests/physical_optimizer/pushdown_utils.rs index 524d33ae6edb6..9048a5471c1d7 100644 --- a/datafusion/core/tests/physical_optimizer/pushdown_utils.rs +++ b/datafusion/core/tests/physical_optimizer/pushdown_utils.rs @@ -20,10 +20,11 @@ use arrow::{array::RecordBatch, compute::concat_batches}; use datafusion::{datasource::object_store::ObjectStoreUrl, physical_plan::PhysicalExpr}; use datafusion_common::{Result, config::ConfigOptions, internal_err}; use datafusion_datasource::{ - PartitionedFile, file::FileSource, file_scan_config::FileScanConfig, - file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture, - file_stream::FileOpener, source::DataSourceExec, + PartitionedFile, file::FileSource, file_groups::FileGroup, + file_scan_config::FileScanConfig, file_scan_config::FileScanConfigBuilder, + file_stream::FileOpenFuture, file_stream::FileOpener, source::DataSourceExec, }; +use datafusion_physical_expr::expressions::snapshot_physical_expr_for_partition; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_optimizer::PhysicalOptimizerRule; @@ -53,6 +54,7 @@ pub struct TestOpener { batch_size: Option, projection: Option, predicate: Option>, + partition: usize, } impl FileOpener for TestOpener { @@ -73,9 +75,15 @@ impl FileOpener for TestOpener { batches = new_batches.into_iter().collect(); } + let predicate = self + .predicate + .clone() + .map(|p| snapshot_physical_expr_for_partition(p, self.partition)) + .transpose()?; + let mut new_batches = Vec::new(); for batch in batches { - let batch = if let Some(predicate) = &self.predicate { + let batch = if let Some(predicate) = &predicate { batch_filter(&batch, predicate)? } else { batch @@ -105,18 +113,25 @@ pub struct TestSource { predicate: Option>, batch_size: Option, batches: Vec, + per_partition_batches: Option>>, metrics: ExecutionPlanMetricsSet, projection: Option, table_schema: datafusion_datasource::TableSchema, } impl TestSource { - pub fn new(schema: SchemaRef, support: bool, batches: Vec) -> Self { + pub fn new( + schema: SchemaRef, + support: bool, + batches: Vec, + per_partition_batches: Option>>, + ) -> Self { let table_schema = datafusion_datasource::TableSchema::new(schema, vec![]); Self { support, metrics: ExecutionPlanMetricsSet::new(), batches, + per_partition_batches, predicate: None, batch_size: None, projection: None, @@ -130,13 +145,20 @@ impl FileSource for TestSource { &self, _object_store: Arc, _base_config: &FileScanConfig, - _partition: usize, + partition: usize, ) -> Result> { + let batches = if let Some(ref per_partition) = self.per_partition_batches { + per_partition.get(partition).cloned().unwrap_or_default() + } else { + self.batches.clone() + }; + Ok(Arc::new(TestOpener { - batches: self.batches.clone(), + batches, batch_size: self.batch_size, projection: self.projection.clone(), predicate: self.predicate.clone(), + partition, })) } @@ -245,7 +267,9 @@ impl FileSource for TestSource { pub struct TestScanBuilder { support: bool, batches: Vec, + per_partition_batches: Vec>, schema: SchemaRef, + file_groups: Vec, } impl TestScanBuilder { @@ -253,7 +277,9 @@ impl TestScanBuilder { Self { support: false, batches: vec![], + per_partition_batches: vec![], schema, + file_groups: vec![], } } @@ -267,17 +293,38 @@ impl TestScanBuilder { self } + pub fn with_batches_for_partition(mut self, batches: Vec) -> Self { + self.per_partition_batches.push(batches); + self + } + + pub fn with_file_group(mut self, group: FileGroup) -> Self { + self.file_groups.push(group); + self + } + pub fn build(self) -> Arc { + let per_partition_batches = if self.per_partition_batches.is_empty() { + None + } else { + Some(self.per_partition_batches) + }; let source = Arc::new(TestSource::new( Arc::clone(&self.schema), self.support, self.batches, + per_partition_batches, )); - let base_config = - FileScanConfigBuilder::new(ObjectStoreUrl::parse("test://").unwrap(), source) - .with_file(PartitionedFile::new("test.parquet", 123)) - .build(); - DataSourceExec::from_data_source(base_config) + let mut builder = + FileScanConfigBuilder::new(ObjectStoreUrl::parse("test://").unwrap(), source); + if self.file_groups.is_empty() { + builder = builder.with_file(PartitionedFile::new("test.parquet", 123)); + } else { + for group in self.file_groups { + builder = builder.with_file_group(group); + } + } + DataSourceExec::from_data_source(builder.build()) } } diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index f87a30265a17b..965d7ffb14afe 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -41,6 +41,7 @@ use datafusion_common::{ ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, exec_err, }; use datafusion_datasource::{PartitionedFile, TableSchema}; +use datafusion_physical_expr::expressions::snapshot_physical_expr_for_partition; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::{ @@ -280,6 +281,7 @@ impl FileOpener for ParquetOpener { let reverse_row_groups = self.reverse_row_groups; let preserve_order = self.preserve_order; + let partition_index = self.partition_index; Ok(Box::pin(async move { #[cfg(feature = "parquet_encryption")] @@ -417,6 +419,14 @@ impl FileOpener for ParquetOpener { predicate = predicate .map(|p| simplifier.simplify(rewriter.rewrite(p)?)) .transpose()?; + + // Snapshot per-partition dynamic filters if available. + // When both sides of a hash join preserve their file partitioning, each partition gets + // its own filter. + predicate = predicate + .map(|p| snapshot_physical_expr_for_partition(p, partition_index)) + .transpose()?; + // Adapt projections to the physical file schema as well projection = projection .try_map_exprs(|p| simplifier.simplify(rewriter.rewrite(p)?))?; @@ -1028,7 +1038,7 @@ mod test { use datafusion_expr::{col, lit}; use datafusion_physical_expr::{ PhysicalExpr, - expressions::{Column, DynamicFilterPhysicalExpr, Literal}, + expressions::{BinaryExpr, Column, DynamicFilterPhysicalExpr, Literal}, planner::logical2physical, projection::ProjectionExprs, }; @@ -1135,6 +1145,12 @@ mod test { self } + /// Set the partition index. + fn with_partition_index(mut self, index: usize) -> Self { + self.partition_index = index; + self + } + /// Enable row group stats pruning. fn with_row_group_stats_pruning(mut self, enable: bool) -> Self { self.enable_row_group_stats_pruning = enable; @@ -2004,4 +2020,129 @@ mod test { "Reverse scan with non-contiguous row groups should correctly map RowSelection" ); } + + #[tokio::test] + async fn test_partition_snapshot_in_opener() { + let store = Arc::new(InMemory::new()) as Arc; + + let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let data_size = + write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await; + + let schema = batch.schema(); + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + + let col_a = Arc::new(Column::new("a", 0)) as Arc; + + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a)], + datafusion_physical_expr::expressions::lit(true) as Arc, + )); + + // Creates a DynamicFilterPhysicalExpr with per-partition bounds: + // - Partition 0: a >= 1 AND a <= 3 (matches all rows) + // - Partition 1: a >= 10 AND a <= 20 (excludes all rows via row group stats) + // - Partition 2: a >= 2 AND a <= 4 (matches some rows) + let p0_filter = Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + datafusion_expr::Operator::GtEq, + datafusion_physical_expr::expressions::lit(1i32) as Arc, + )) as Arc, + datafusion_expr::Operator::And, + Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + datafusion_expr::Operator::LtEq, + datafusion_physical_expr::expressions::lit(3i32) as Arc, + )) as Arc, + )) as Arc; + + let p1_filter = Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + datafusion_expr::Operator::GtEq, + datafusion_physical_expr::expressions::lit(10i32) + as Arc, + )) as Arc, + datafusion_expr::Operator::And, + Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + datafusion_expr::Operator::LtEq, + datafusion_physical_expr::expressions::lit(20i32) + as Arc, + )) as Arc, + )) as Arc; + + let p2_filter = Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + datafusion_expr::Operator::GtEq, + datafusion_physical_expr::expressions::lit(2i32) as Arc, + )) as Arc, + datafusion_expr::Operator::And, + Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + datafusion_expr::Operator::LtEq, + datafusion_physical_expr::expressions::lit(4i32) as Arc, + )) as Arc, + )) as Arc; + + dynamic_filter + .update_partitioned(vec![Some(p0_filter), Some(p1_filter), Some(p2_filter)]) + .unwrap(); + + // Partition 0: should read all 3 rows (a >= 1 AND a <= 3 matches) + let opener_p0 = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_predicate(Arc::clone(&dynamic_filter) as Arc) + .with_partition_index(0) + .with_row_group_stats_pruning(true) + .build(); + + let stream = opener_p0.open(file.clone()).unwrap().await.unwrap(); + let (_, num_rows) = count_batches_and_rows(stream).await; + assert_eq!( + num_rows, 3, + "Partition 0 should read all 3 rows (filter a >= 1 AND a <= 3)" + ); + + // Partition 1: should read 0 rows (a >= 10 AND a <= 20 excludes all) + let opener_p1 = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_predicate(Arc::clone(&dynamic_filter) as Arc) + .with_partition_index(1) + .with_row_group_stats_pruning(true) + .build(); + + let stream = opener_p1.open(file.clone()).unwrap().await.unwrap(); + let (_, num_rows) = count_batches_and_rows(stream).await; + assert_eq!( + num_rows, 0, + "Partition 1 should read 0 rows (filter a >= 10 AND a <= 20 excludes all data)" + ); + + // Partition 2: should read all 3 rows (a >= 2 AND a <= 4 matches) + let opener_p2 = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_predicate(Arc::clone(&dynamic_filter) as Arc) + .with_partition_index(2) + .with_row_group_stats_pruning(true) + .build(); + + let stream = opener_p2.open(file).unwrap().await.unwrap(); + let (_, num_rows) = count_batches_and_rows(stream).await; + assert_eq!( + num_rows, 3, + "Partition 2 should read 3 rows (filter a >= 2 AND a <= 4)" + ); + } } diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index d285f8b377eca..2d0b27179749e 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -20,6 +20,7 @@ use std::{any::Any, fmt::Display, hash::Hash, sync::Arc}; use tokio::sync::watch; use crate::PhysicalExpr; +use crate::expressions::lit; use arrow::datatypes::{DataType, Schema}; use datafusion_common::{ Result, @@ -46,6 +47,9 @@ impl FilterState { } } +/// Per-partition filter expressions indexed by partition number. +type PartitionedFilters = Vec>>; + /// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it. /// /// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also @@ -86,6 +90,11 @@ struct Inner { /// This is redundant with the watch channel state, but allows us to return immediately /// from `wait_complete()` without subscribing if already complete. is_complete: bool, + /// Per-partition filter expressions for partition-index routing. + /// When both sides of a hash join preserve their file partitioning (no RepartitionExec(Hash)), + /// build-partition i corresponds to probe-partition i. This allows storing per-partition + /// filters so that each partition only sees its own bounds, giving tighter filtering. + partitioned_exprs: PartitionedFilters, } impl Inner { @@ -96,6 +105,7 @@ impl Inner { generation: 1, expr, is_complete: false, + partitioned_exprs: Vec::new(), } } @@ -209,11 +219,6 @@ impl DynamicFilterPhysicalExpr { } } - /// Get the current generation of the expression. - fn current_generation(&self) -> u64 { - self.inner.read().generation - } - /// Get the current expression. /// This will return the current expression with any children /// remapped to match calls to [`PhysicalExpr::with_new_children`]. @@ -242,11 +247,8 @@ impl DynamicFilterPhysicalExpr { // Load the current inner, increment generation, and store the new one let mut current = self.inner.write(); let new_generation = current.generation + 1; - *current = Inner { - generation: new_generation, - expr: new_expr, - is_complete: current.is_complete, - }; + current.generation = new_generation; + current.expr = new_expr; drop(current); // Release the lock before broadcasting // Broadcast the new state to all waiters @@ -272,6 +274,61 @@ impl DynamicFilterPhysicalExpr { }); } + /// Update the dynamic filter with per-partition filter expressions. + /// + /// This stores one filter per partition, indexed by partition number. + pub fn update_partitioned(&self, partition_exprs: PartitionedFilters) -> Result<()> { + let mut current = self.inner.write(); + let new_generation = current.generation + 1; + current.generation = new_generation; + current.partitioned_exprs = partition_exprs; + drop(current); + + // Broadcast the new state. + let _ = self.state_watch.send(FilterState::InProgress { + generation: new_generation, + }); + Ok(()) + } + + /// Get the filter expression for a specific partition. + /// + /// Semantics when per-partition filters are present: + /// - `Some(Some(expr))`: use the partition-local filter. + /// - `Some(None)`: the build partition is known empty, so return `false`. + /// - `None` (out-of-range): return `true` (fail-open) to avoid incorrect pruning if + /// partition alignment/count assumptions are violated by a source. + fn current_for_partition(&self, partition: usize) -> Result> { + let guard = self.inner.read(); + if guard.partitioned_exprs.is_empty() { + let expr = Arc::clone(guard.expr()); + drop(guard); + return Self::remap_children( + &self.children, + self.remapped_children.as_ref(), + expr, + ); + } + match guard.partitioned_exprs.get(partition) { + Some(Some(expr)) => { + let expr = Arc::clone(expr); + drop(guard); + Self::remap_children( + &self.children, + self.remapped_children.as_ref(), + expr, + ) + } + Some(None) => Ok(lit(false) as Arc), + None => Ok(lit(true) as Arc), + } + } + + /// Returns `true` if per-partition filter data has been set. + fn has_partitioned_filters(&self) -> bool { + !self.inner.read().partitioned_exprs.is_empty() + } + /// Wait asynchronously for any update to this filter. /// /// This method will return when [`Self::update`] is called and the generation increases. @@ -330,18 +387,35 @@ impl DynamicFilterPhysicalExpr { fn render( &self, f: &mut std::fmt::Formatter<'_>, - render_expr: impl FnOnce( + render_expr: impl Fn( Arc, &mut std::fmt::Formatter<'_>, ) -> std::fmt::Result, ) -> std::fmt::Result { - let inner = self.current().map_err(|_| std::fmt::Error)?; - let current_generation = self.current_generation(); + let guard = self.inner.read(); + let current_generation = guard.generation; + let partitioned_exprs = guard.partitioned_exprs.clone(); + drop(guard); + write!(f, "DynamicFilter [ ")?; - if current_generation == 1 { + if !partitioned_exprs.is_empty() { + write!(f, "{{")?; + for (i, partition) in partitioned_exprs.iter().enumerate() { + if i > 0 { + write!(f, ", ")?; + } + write!(f, "{i}: ")?; + match partition { + Some(expr) => render_expr(Arc::clone(expr), f)?, + None => write!(f, "pruned")?, + } + } + write!(f, "}}")?; + } else if current_generation == 1 { write!(f, "empty")?; } else { - render_expr(inner, f)?; + let current = self.current().map_err(|_| std::fmt::Error)?; + render_expr(current, f)?; } write!(f, " ]") @@ -450,6 +524,25 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { } } +/// Snapshot a `PhysicalExpr` tree, replacing any [`DynamicFilterPhysicalExpr`] that +/// has per-partition data with its partition-specific filter expression. +/// If a `DynamicFilterPhysicalExpr` does not have partitioned data, it is left unchanged. +pub fn snapshot_physical_expr_for_partition( + expr: Arc, + partition: usize, +) -> Result> { + expr.transform_up(|e| { + if let Some(dynamic) = e.as_any().downcast_ref::() + && dynamic.has_partitioned_filters() + { + let snapshot = dynamic.current_for_partition(partition)?; + return Ok(Transformed::yes(snapshot)); + } + Ok(Transformed::no(e)) + }) + .data() +} + #[cfg(test)] mod test { use crate::{ @@ -867,4 +960,142 @@ mod test { "Hash should be stable after update (identity-based)" ); } + + #[test] + fn test_update_partitioned_and_current_for_partition() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let col_a = col("a", &schema).unwrap(); + + let dynamic_filter = DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a)], + lit(true) as Arc, + ); + + // Create per-partition expressions + let partition_0_expr = Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + datafusion_expr::Operator::GtEq, + lit(10) as Arc, + )) as Arc; + let partition_1_expr = Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + datafusion_expr::Operator::GtEq, + lit(20) as Arc, + )) as Arc; + + let partition_exprs = vec![ + Some(Arc::clone(&partition_0_expr)), + Some(Arc::clone(&partition_1_expr)), + ]; + + dynamic_filter.update_partitioned(partition_exprs).unwrap(); + + assert!(dynamic_filter.has_partitioned_filters()); + + // Partition 0 should get its specific filter + let p0 = dynamic_filter.current_for_partition(0).unwrap(); + assert_eq!(format!("{p0}"), format!("{partition_0_expr}")); + + // Partition 1 should get its specific filter + let p1 = dynamic_filter.current_for_partition(1).unwrap(); + assert_eq!(format!("{p1}"), format!("{partition_1_expr}")); + } + + #[test] + fn test_current_for_partition_empty_and_out_of_range() { + let dynamic_filter = + DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc); + + let partition_exprs = vec![ + Some(lit(42) as Arc), + None, // Empty partition + ]; + + dynamic_filter.update_partitioned(partition_exprs).unwrap(); + + // Partition 1 is empty, should return lit(false) + let p1 = dynamic_filter.current_for_partition(1).unwrap(); + assert_eq!(format!("{p1}"), "false"); + + // Partition 5 is out of range, should fail-open to lit(true) + let p5 = dynamic_filter.current_for_partition(5).unwrap(); + assert_eq!(format!("{p5}"), "true"); + } + + #[test] + fn test_snapshot_physical_expr_for_partition_with_data() { + use super::snapshot_physical_expr_for_partition; + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let col_a = col("a", &schema).unwrap(); + + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a)], + lit(true) as Arc, + )); + + let partition_0_expr = Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + datafusion_expr::Operator::GtEq, + lit(10) as Arc, + )) as Arc; + let partition_1_expr = Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + datafusion_expr::Operator::LtEq, + lit(20) as Arc, + )) as Arc; + + let partition_exprs = vec![ + Some(Arc::clone(&partition_0_expr)), + Some(Arc::clone(&partition_1_expr)), + ]; + dynamic_filter.update_partitioned(partition_exprs).unwrap(); + + // Wrap dynamic filter in a BinaryExpr + let wrapper = Arc::new(BinaryExpr::new( + Arc::clone(&dynamic_filter) as Arc, + datafusion_expr::Operator::And, + lit(true) as Arc, + )) as Arc; + + // Snapshot for partition 0 should replace the dynamic filter with partition 0's expr + let snapped_0 = + snapshot_physical_expr_for_partition(Arc::clone(&wrapper), 0).unwrap(); + assert!( + format!("{snapped_0}").contains(">="), + "Expected >= for partition 0, got: {snapped_0}" + ); + + // Snapshot for partition 1 should replace with partition 1's expr + let snapped_1 = snapshot_physical_expr_for_partition(wrapper, 1).unwrap(); + assert!( + format!("{snapped_1}").contains("<="), + "Expected <= for partition 1, got: {snapped_1}" + ); + } + + #[test] + fn test_snapshot_physical_expr_for_partition_without_data() { + use super::snapshot_physical_expr_for_partition; + + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![], + lit(42) as Arc, + )); + + // No partitioned data set should leave the DynamicFilterPhysicalExpr unchanged + let result = snapshot_physical_expr_for_partition( + Arc::clone(&dynamic_filter) as Arc, + 0, + ) + .unwrap(); + + assert!( + result + .as_any() + .downcast_ref::() + .is_some(), + "Without partitioned data, should leave DynamicFilterPhysicalExpr unchanged" + ); + } } diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index c9e02708d6c28..c7084ca9ab750 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -46,6 +46,7 @@ pub use cast_column::CastColumnExpr; pub use column::{Column, col, with_new_schema}; pub use datafusion_expr::utils::format_state_name; pub use dynamic_filters::DynamicFilterPhysicalExpr; +pub use dynamic_filters::snapshot_physical_expr_for_partition; pub use in_list::{InListExpr, in_list}; pub use is_not_null::{IsNotNullExpr, is_not_null}; pub use is_null::{IsNullExpr, is_null}; diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 790669b5c9dbf..3e09d0176d11e 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -49,7 +49,8 @@ use datafusion_physical_plan::aggregates::{ use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::execution_plan::EmissionType; use datafusion_physical_plan::joins::{ - CrossJoinExec, HashJoinExec, HashJoinExecBuilder, PartitionMode, SortMergeJoinExec, + CrossJoinExec, DynamicFilterRoutingMode, HashJoinExec, HashJoinExecBuilder, + PartitionMode, SortMergeJoinExec, }; use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::repartition::RepartitionExec; @@ -296,6 +297,7 @@ pub fn adjust_input_keys_ordering( mode, null_equality, null_aware, + dynamic_filter_routing_mode, .. }) = plan.as_any().downcast_ref::() { @@ -317,6 +319,7 @@ pub fn adjust_input_keys_ordering( .with_partition_mode(PartitionMode::Partitioned) .with_null_equality(*null_equality) .with_null_aware(*null_aware) + .with_dynamic_filter_routing_mode(*dynamic_filter_routing_mode) .build() .map(|e| Arc::new(e) as _) }; @@ -622,6 +625,7 @@ pub fn reorder_join_keys_to_inputs( mode, null_equality, null_aware, + dynamic_filter_routing_mode, .. }) = plan_any.downcast_ref::() { @@ -651,6 +655,7 @@ pub fn reorder_join_keys_to_inputs( .with_partition_mode(PartitionMode::Partitioned) .with_null_equality(*null_equality) .with_null_aware(*null_aware) + .with_dynamic_filter_routing_mode(*dynamic_filter_routing_mode) .build()?, )); } @@ -870,7 +875,14 @@ fn add_roundrobin_on_top( let new_plan = Arc::new(repartition) as _; - Ok(DistributionContext::new(new_plan, true, vec![input])) + Ok(DistributionContext::new( + new_plan, + DistFlags { + dist_changing: true, + repartitioned: true, + }, + vec![input], + )) } else { // Partition is not helpful, we already have desired number of partitions. Ok(input) @@ -939,7 +951,14 @@ fn add_hash_on_top( .with_preserve_order(); let plan = Arc::new(repartition) as _; - return Ok(DistributionContext::new(plan, true, vec![input])); + return Ok(DistributionContext::new( + plan, + DistFlags { + dist_changing: true, + repartitioned: true, + }, + vec![input], + )); } Ok(input) @@ -976,7 +995,14 @@ fn add_merge_on_top(input: DistributionContext) -> DistributionContext { Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _ }; - DistributionContext::new(new_plan, true, vec![input]) + DistributionContext::new( + new_plan, + DistFlags { + dist_changing: true, + repartitioned: input.data.repartitioned, + }, + vec![input], + ) } else { input } @@ -1040,7 +1066,7 @@ pub fn replace_order_preserving_variants( .children .into_iter() .map(|child| { - if child.data { + if child.data.dist_changing { replace_order_preserving_variants(child) } else { Ok(child) @@ -1377,7 +1403,7 @@ pub fn ensure_distribution( .ordering_satisfy_requirement(sort_req.clone())?; if (!ordering_satisfied || !order_preserving_variants_desirable) - && child.data + && child.data.dist_changing { child = replace_order_preserving_variants(child)?; // If ordering requirements were satisfied before repartitioning, @@ -1395,7 +1421,7 @@ pub fn ensure_distribution( } } // Stop tracking distribution changing operators - child.data = false; + child.data.dist_changing = false; } else { // no ordering requirement match requirement { @@ -1454,21 +1480,58 @@ pub fn ensure_distribution( plan.with_new_children(children_plans)? }; + // For partitioned hash joins, decide dynamic filter routing mode. + // + // PartitionIndex routing requires that partition `i` on the build side corresponds to + // partition `i` on the probe side. This holds when both sides' partitioning comes from + // file-grouped sources (via `preserve_file_partitions`) rather than hash repartitioning. + plan = if let Some(hash_join) = plan.as_any().downcast_ref::() + && matches!(hash_join.mode, PartitionMode::Partitioned) + { + let routing_mode = + if children[0].data.repartitioned || children[1].data.repartitioned { + DynamicFilterRoutingMode::CaseHash + } else { + DynamicFilterRoutingMode::PartitionIndex + }; + if routing_mode != hash_join.dynamic_filter_routing_mode { + Arc::new( + HashJoinExecBuilder::from(hash_join) + .with_dynamic_filter_routing_mode(routing_mode) + .build()?, + ) + } else { + plan + } + } else { + plan + }; + Ok(Transformed::yes(DistributionContext::new( plan, data, children, ))) } -/// Keeps track of distribution changing operators (like `RepartitionExec`, -/// `SortPreservingMergeExec`, `CoalescePartitionsExec`) and their ancestors. -/// Using this information, we can optimize distribution of the plan if/when -/// necessary. -pub type DistributionContext = PlanContext; +/// State propagated during the bottom-up pass in [`ensure_distribution`]. +#[derive(Clone, Copy, Debug, Default)] +pub struct DistFlags { + /// Whether a distribution-changing operator (`RepartitionExec`, `SortPreservingMergeExec`, + /// `CoalescePartitionsExec`) exists in the subtree. + pub dist_changing: bool, + /// Whether the output partitioning originates from a [`RepartitionExec`]. + /// Used by partitioned hash joins to choose the dynamic filter routing + /// mode. + pub repartitioned: bool, +} + +pub type DistributionContext = PlanContext; fn update_children(mut dist_context: DistributionContext) -> Result { for child_context in dist_context.children.iter_mut() { let child_plan_any = child_context.plan.as_any(); - child_context.data = + + // Track distribution-changing operators for order-preservation optimization. + child_context.data.dist_changing = if let Some(repartition) = child_plan_any.downcast_ref::() { !matches!( repartition.partitioning(), @@ -1478,23 +1541,46 @@ fn update_children(mut dist_context: DistributionContext) -> Result() || child_plan_any.is::() || child_context.plan.children().is_empty() - || child_context.children[0].data + || child_context.children[0].data.dist_changing || child_context .plan .required_input_distribution() .iter() .zip(child_context.children.iter()) .any(|(required_dist, child_context)| { - child_context.data + child_context.data.dist_changing && matches!( required_dist, Distribution::UnspecifiedDistribution ) }) - } + }; + + // Track whether partitioning originates from a RepartitionExec, following the partition + // determining path through the context tree. + child_context.data.repartitioned = + if let Some(repartition) = child_plan_any.downcast_ref::() { + !matches!( + repartition.partitioning(), + Partitioning::UnknownPartitioning(_) + ) + } else if child_context.plan.children().is_empty() { + false + } else if let Some(hj) = child_plan_any.downcast_ref::() + && matches!(hj.mode, PartitionMode::CollectLeft) + { + // CollectLeft: output partitioning comes from the probe (right) side + child_context + .children + .get(1) + .map(|c| c.data.repartitioned) + .unwrap_or(false) + } else { + child_context.children.iter().any(|c| c.data.repartitioned) + }; } - dist_context.data = false; + dist_context.data = DistFlags::default(); Ok(dist_context) } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 6b18b56413b71..cc8a416a5330e 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -256,6 +256,7 @@ pub struct HashJoinExecBuilder { filter: Option, projection: Option, partition_mode: PartitionMode, + dynamic_filter_routing_mode: DynamicFilterRoutingMode, null_equality: NullEquality, null_aware: bool, } @@ -275,6 +276,7 @@ impl HashJoinExecBuilder { filter: None, projection: None, partition_mode: PartitionMode::Auto, + dynamic_filter_routing_mode: DynamicFilterRoutingMode::CaseHash, join_type, null_equality: NullEquality::NullEqualsNothing, null_aware: false, @@ -304,6 +306,15 @@ impl HashJoinExecBuilder { self } + /// Set dynamic filter routing mode for partitioned joins. + pub fn with_dynamic_filter_routing_mode( + mut self, + mode: DynamicFilterRoutingMode, + ) -> Self { + self.dynamic_filter_routing_mode = mode; + self + } + /// Set null equality property. pub fn with_null_equality(mut self, null_equality: NullEquality) -> Self { self.null_equality = null_equality; @@ -326,6 +337,7 @@ impl HashJoinExecBuilder { filter, projection, partition_mode, + dynamic_filter_routing_mode, null_equality, null_aware, } = self; @@ -386,6 +398,7 @@ impl HashJoinExecBuilder { left_fut: Default::default(), random_state, mode: partition_mode, + dynamic_filter_routing_mode, metrics: ExecutionPlanMetricsSet::new(), projection, column_indices, @@ -407,12 +420,29 @@ impl From<&HashJoinExec> for HashJoinExecBuilder { filter: exec.filter.clone(), projection: exec.projection.clone(), partition_mode: exec.mode, + dynamic_filter_routing_mode: exec.dynamic_filter_routing_mode, null_equality: exec.null_equality, null_aware: exec.null_aware, } } } +/// Routing mode for partitioned dynamic filters in hash joins. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum DynamicFilterRoutingMode { + /// Route probe rows by hash (CASE expression). + /// + /// This mode is safe without partition-index alignment assumptions and is used when the + /// optimizer detects hash repartitioning on the relevant path. + CaseHash, + /// Route by partition index (`i -> i`). + /// + /// This assumes build/probe partition indices are aligned for dynamic filter consumers: + /// the filter for build partition `i` must be applied to probe partition `i`. + /// The optimizer should only select this mode when that mapping is expected to hold. + PartitionIndex, +} + #[expect(rustdoc::private_intra_doc_links)] /// Join execution plan: Evaluates equijoin predicates in parallel on multiple /// partitions using a hash table and an optional filter list to apply post @@ -630,6 +660,11 @@ pub struct HashJoinExec { random_state: SeededRandomState, /// Partitioning mode to use pub mode: PartitionMode, + /// Dynamic filter routing mode for partitioned joins. + /// + /// `CaseHash` uses hash-based routing and does not require alignment. + /// `PartitionIndex` assumes build/probe partition alignment. + pub dynamic_filter_routing_mode: DynamicFilterRoutingMode, /// Execution metrics metrics: ExecutionPlanMetricsSet, /// The projection indices of the columns in the output schema of join @@ -723,19 +758,6 @@ impl HashJoinExec { if !config.optimizer.enable_join_dynamic_filter_pushdown { return false; } - - // `preserve_file_partitions` can report Hash partitioning for Hive-style - // file groups, but those partitions are not actually hash-distributed. - // Partitioned dynamic filters rely on hash routing, so disable them in - // this mode to avoid incorrect results. Follow-up work: enable dynamic - // filtering for preserve_file_partitioned scans (issue #20195). - // https://github.com/apache/datafusion/issues/20195 - if config.optimizer.preserve_file_partitions > 0 - && self.mode == PartitionMode::Partitioned - { - return false; - } - true } @@ -789,6 +811,17 @@ impl HashJoinExec { self.dynamic_filter.as_ref().map(|df| &df.filter) } + /// Determines whether partition-index routing should be used instead of CASE hash routing. + /// + /// Enabled when: + /// 1. The join is in `Partitioned` mode + /// 2. The optimizer selected `DynamicFilterRoutingMode::PartitionIndex`. + fn should_use_partition_index(&self) -> bool { + matches!(self.mode, PartitionMode::Partitioned) + && self.dynamic_filter_routing_mode + == DynamicFilterRoutingMode::PartitionIndex + } + /// Calculate order preservation flags for this hash join. fn maintains_input_order(join_type: JoinType) -> Vec { vec![ @@ -930,25 +963,27 @@ impl HashJoinExec { ) -> Result> { let left = self.left(); let right = self.right(); - let new_join = HashJoinExec::try_new( + let new_join = HashJoinExecBuilder::new( Arc::clone(right), Arc::clone(left), - self.on() + self.on .iter() .map(|(l, r)| (Arc::clone(r), Arc::clone(l))) .collect(), - self.filter().map(JoinFilter::swap), - &self.join_type().swap(), - swap_join_projection( - left.schema().fields().len(), - right.schema().fields().len(), - self.projection.as_deref(), - self.join_type(), - ), - partition_mode, - self.null_equality(), - self.null_aware, - )?; + self.join_type().swap(), + ) + .with_filter(self.filter().map(JoinFilter::swap)) + .with_projection(swap_join_projection( + left.schema().fields().len(), + right.schema().fields().len(), + self.projection.as_deref(), + self.join_type(), + )) + .with_partition_mode(partition_mode) + .with_null_equality(self.null_equality()) + .with_null_aware(self.null_aware) + .with_dynamic_filter_routing_mode(self.dynamic_filter_routing_mode) + .build()?; // In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again if matches!( self.join_type(), @@ -1126,6 +1161,8 @@ impl ExecutionPlan for HashJoinExec { left_fut: Arc::clone(&self.left_fut), random_state: self.random_state.clone(), mode: self.mode, + // Preserve optimizer-selected routing mode across plan rewrites. + dynamic_filter_routing_mode: self.dynamic_filter_routing_mode, metrics: ExecutionPlanMetricsSet::new(), projection: self.projection.clone(), column_indices: self.column_indices.clone(), @@ -1157,6 +1194,7 @@ impl ExecutionPlan for HashJoinExec { left_fut: Arc::new(OnceAsync::default()), random_state: self.random_state.clone(), mode: self.mode, + dynamic_filter_routing_mode: self.dynamic_filter_routing_mode, metrics: ExecutionPlanMetricsSet::new(), projection: self.projection.clone(), column_indices: self.column_indices.clone(), @@ -1267,6 +1305,7 @@ impl ExecutionPlan for HashJoinExec { // Initialize build_accumulator lazily with runtime partition counts (only if enabled) // Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing let repartition_random_state = REPARTITION_RANDOM_STATE; + let use_partition_index = self.should_use_partition_index(); let build_accumulator = enable_dynamic_filter_pushdown .then(|| { self.dynamic_filter.as_ref().map(|df| { @@ -1284,6 +1323,7 @@ impl ExecutionPlan for HashJoinExec { filter, on_right, repartition_random_state, + use_partition_index, )) }))) }) @@ -1384,18 +1424,21 @@ impl ExecutionPlan for HashJoinExec { &schema, self.filter(), )? { - Ok(Some(Arc::new(HashJoinExec::try_new( - Arc::new(projected_left_child), - Arc::new(projected_right_child), - join_on, - join_filter, - self.join_type(), + Ok(Some(Arc::new( + HashJoinExecBuilder::new( + Arc::new(projected_left_child), + Arc::new(projected_right_child), + join_on, + self.join_type, + ) + .with_filter(join_filter) // Returned early if projection is not None - None, - *self.partition_mode(), - self.null_equality, - self.null_aware, - )?))) + .with_partition_mode(*self.partition_mode()) + .with_null_equality(self.null_equality) + .with_null_aware(self.null_aware) + .with_dynamic_filter_routing_mode(self.dynamic_filter_routing_mode) + .build()?, + ))) } else { try_embed_projection(projection, self) } @@ -1483,6 +1526,7 @@ impl ExecutionPlan for HashJoinExec { left_fut: Arc::clone(&self.left_fut), random_state: self.random_state.clone(), mode: self.mode, + dynamic_filter_routing_mode: self.dynamic_filter_routing_mode, metrics: ExecutionPlanMetricsSet::new(), projection: self.projection.clone(), column_indices: self.column_indices.clone(), @@ -5715,4 +5759,69 @@ mod tests { .contains("null_aware anti join only supports single column join key") ); } + + fn build_partition_index_mode_test_join( + partition_mode: PartitionMode, + routing_mode: DynamicFilterRoutingMode, + ) -> Result { + let left = build_table( + ("a1", &vec![1, 2]), + ("b1", &vec![4, 5]), + ("c1", &vec![7, 8]), + ); + let right = build_table( + ("a2", &vec![10, 20]), + ("b1", &vec![4, 5]), + ("c2", &vec![70, 80]), + ); + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) + as Arc, + Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) + as Arc, + )]; + + HashJoinExecBuilder::new(left, right, on, JoinType::Inner) + .with_partition_mode(partition_mode) + .with_dynamic_filter_routing_mode(routing_mode) + .build() + } + + #[test] + fn test_should_use_partition_index() -> Result<()> { + let cases = [ + ( + PartitionMode::CollectLeft, + DynamicFilterRoutingMode::PartitionIndex, + false, + "CollectLeft should never use partition index", + ), + ( + PartitionMode::Partitioned, + DynamicFilterRoutingMode::PartitionIndex, + true, + "Partitioned mode with PartitionIndex should use partition index", + ), + ( + PartitionMode::Partitioned, + DynamicFilterRoutingMode::CaseHash, + false, + "Partitioned mode with CaseHash should use CASE routing", + ), + ( + PartitionMode::Auto, + DynamicFilterRoutingMode::PartitionIndex, + false, + "Auto mode should never use partition index", + ), + ]; + + for (partition_mode, routing_mode, expected, message) in cases { + let join = + build_partition_index_mode_test_join(partition_mode, routing_mode)?; + assert_eq!(join.should_use_partition_index(), expected, "{message}"); + } + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/joins/hash_join/mod.rs b/datafusion/physical-plan/src/joins/hash_join/mod.rs index b915802ea4015..93afcab849a78 100644 --- a/datafusion/physical-plan/src/joins/hash_join/mod.rs +++ b/datafusion/physical-plan/src/joins/hash_join/mod.rs @@ -17,7 +17,7 @@ //! [`HashJoinExec`] Partitioned Hash Join Operator -pub use exec::{HashJoinExec, HashJoinExecBuilder}; +pub use exec::{DynamicFilterRoutingMode, HashJoinExec, HashJoinExecBuilder}; pub use partitioned_hash_eval::{HashExpr, HashTableLookupExpr, SeededRandomState}; mod exec; diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index f32dc7fa80268..9bc425b47e15f 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -50,7 +50,7 @@ use tokio::sync::Barrier; pub(crate) struct ColumnBounds { /// The minimum value observed for this column pub(crate) min: ScalarValue, - /// The maximum value observed for this column + /// The maximum value observed for this column pub(crate) max: ScalarValue, } @@ -226,6 +226,9 @@ pub(crate) struct SharedBuildAccumulator { repartition_random_state: SeededRandomState, /// Schema of the probe (right) side for evaluating filter expressions probe_schema: Arc, + /// Use partition-index routing (probe partition `i` uses build filter `i`). + /// Requires aligned build/probe partitions. + use_partition_index: bool, } /// Strategy for filter pushdown (decided at collection time) @@ -269,6 +272,53 @@ enum AccumulatedBuildData { }, } +/// Build a combined filter expression (bounds AND membership) for a single partition. +/// +/// Combines membership and bounds expressions for multi-layer optimization: +/// - Bounds (min/max): Enable statistics-based pruning (Parquet row group/file skipping) +/// - Membership (InList/hash lookup): Enables: +/// * Precise filtering (exact value matching) +/// * Bloom filter utilization (if present in Parquet files) +/// * Better pruning for data types where min/max isn't effective (e.g., UUIDs) +/// +/// Together, they provide complementary benefits and maximize data skipping. +fn build_partition_filter_expr( + on_right: &[PhysicalExprRef], + partition: &PartitionData, + probe_schema: &Schema, +) -> Result>> { + // Create membership predicate (InList for small build sides, hash lookup otherwise) + let membership_expr = create_membership_predicate( + on_right, + partition.pushdown.clone(), + &HASH_JOIN_SEED, + probe_schema, + )?; + + // Create bounds check expression (if bounds available) + let bounds_expr = create_bounds_predicate(on_right, &partition.bounds); + + match (membership_expr, bounds_expr) { + // Both available: combine with AND + (Some(membership), Some(bounds)) => Ok(Some(Arc::new(BinaryExpr::new( + bounds, + Operator::And, + membership, + )) as Arc)), + // Membership available but no bounds + // This is reachable when we have data but bounds aren't available + // (e.g., unsupported data types or no columns with bounds) + (Some(membership), None) => Ok(Some(membership)), + // Bounds available but no membership. + // This should be unreachable in practice: we can always push down a reference + // to the hash table. + // But it seems safer to handle it defensively. + (None, Some(bounds)) => Ok(Some(bounds)), + // No filter available (e.g., empty build side) + (None, None) => Ok(None), + } +} + impl SharedBuildAccumulator { /// Creates a new SharedBuildAccumulator configured for the given partition mode /// @@ -302,6 +352,7 @@ impl SharedBuildAccumulator { dynamic_filter: Arc, on_right: Vec, repartition_random_state: SeededRandomState, + use_partition_index: bool, ) -> Self { // Troubleshooting: If partition counts are incorrect, verify this logic matches // the actual execution pattern in collect_build_side() @@ -342,6 +393,7 @@ impl SharedBuildAccumulator { on_right, repartition_random_state, probe_schema: right_child.schema(), + use_partition_index, } } @@ -402,181 +454,137 @@ impl SharedBuildAccumulator { // CollectLeft: Simple conjunction of bounds and membership check AccumulatedBuildData::CollectLeft { data } => { if let Some(partition_data) = data { - // Create membership predicate (InList for small build sides, hash lookup otherwise) - let membership_expr = create_membership_predicate( - &self.on_right, - partition_data.pushdown.clone(), - &HASH_JOIN_SEED, - self.probe_schema.as_ref(), - )?; - - // Create bounds check expression (if bounds available) - let bounds_expr = create_bounds_predicate( - &self.on_right, - &partition_data.bounds, - ); - - // Combine membership and bounds expressions for multi-layer optimization: - // - Bounds (min/max): Enable statistics-based pruning (Parquet row group/file skipping) - // - Membership (InList/hash lookup): Enables: - // * Precise filtering (exact value matching) - // * Bloom filter utilization (if present in Parquet files) - // * Better pruning for data types where min/max isn't effective (e.g., UUIDs) - // Together, they provide complementary benefits and maximize data skipping. // Only update the filter if we have something to push down - if let Some(filter_expr) = match (membership_expr, bounds_expr) { - (Some(membership), Some(bounds)) => { - // Both available: combine with AND - Some(Arc::new(BinaryExpr::new( - bounds, - Operator::And, - membership, - )) - as Arc) - } - (Some(membership), None) => { - // Membership available but no bounds - // This is reachable when we have data but bounds aren't available - // (e.g., unsupported data types or no columns with bounds) - Some(membership) - } - (None, Some(bounds)) => { - // Bounds available but no membership. - // This should be unreachable in practice: we can always push down a reference - // to the hash table. - // But it seems safer to handle it defensively. - Some(bounds) - } - (None, None) => { - // No filter available (e.g., empty build side) - // Don't update the filter, but continue to mark complete - None - } - } { + if let Some(filter_expr) = build_partition_filter_expr( + &self.on_right, + partition_data, + &self.probe_schema, + )? { self.dynamic_filter.update(filter_expr)?; } } } // Partitioned: CASE expression routing to per-partition filters AccumulatedBuildData::Partitioned { partitions } => { - // Collect all partition data (should all be Some at this point) - let partition_data: Vec<_> = - partitions.iter().filter_map(|p| p.as_ref()).collect(); - - if !partition_data.is_empty() { - // Build a CASE expression that combines range checks AND membership checks - // CASE (hash_repartition(join_keys) % num_partitions) - // WHEN 0 THEN (col >= min_0 AND col <= max_0 AND ...) AND membership_check_0 - // WHEN 1 THEN (col >= min_1 AND col <= max_1 AND ...) AND membership_check_1 + if self.use_partition_index { + // Partition-index routing: both sides preserve file partitioning (no + // RepartitionExec(Hash)), so build-partition i corresponds to + // probe-partition i. Store per-partition filters directly: + // partitioned_exprs[0] = (col >= min_0 AND col <= max_0 AND ...) AND membership_check_0 + // partitioned_exprs[1] = (col >= min_1 AND col <= max_1 AND ...) AND membership_check_1 // ... - // ELSE false - // END - - let num_partitions = partition_data.len(); - - // Create base expression: hash_repartition(join_keys) % num_partitions - let routing_hash_expr = Arc::new(HashExpr::new( - self.on_right.clone(), - self.repartition_random_state.clone(), - "hash_repartition".to_string(), - )) - as Arc; - - let modulo_expr = Arc::new(BinaryExpr::new( - routing_hash_expr, - Operator::Modulo, - lit(ScalarValue::UInt64(Some(num_partitions as u64))), - )) - as Arc; - - // Create WHEN branches for each partition - let when_then_branches: Vec<( - Arc, - Arc, - )> = partitions - .iter() - .enumerate() - .filter_map(|(partition_id, partition_opt)| { - partition_opt.as_ref().and_then(|partition| { - // Skip empty partitions - they would always return false anyway - match &partition.pushdown { - PushdownStrategy::Empty => None, - _ => Some((partition_id, partition)), + // partitioned_exprs[N] = None (empty partition — no data) + let per_partition: Vec>> = + partitions + .iter() + .map(|p| match p { + Some(partition) + if !matches!( + partition.pushdown, + PushdownStrategy::Empty + ) => + { + let expr = build_partition_filter_expr( + &self.on_right, + partition, + &self.probe_schema, + )? + // If no partition-local filter can be built, fall back to + // neutral predicate for this partition. + .unwrap_or_else(|| lit(true)); + Ok(Some(expr)) } + _ => Ok(None), }) - }) - .map(|(partition_id, partition)| -> Result<_> { - // WHEN partition_id - let when_expr = - lit(ScalarValue::UInt64(Some(partition_id as u64))); - - // THEN: Combine bounds check AND membership predicate - - // 1. Create membership predicate (InList for small build sides, hash lookup otherwise) - let membership_expr = create_membership_predicate( - &self.on_right, - partition.pushdown.clone(), - &HASH_JOIN_SEED, - self.probe_schema.as_ref(), - )?; - - // 2. Create bounds check expression for this partition (if bounds available) - let bounds_expr = create_bounds_predicate( - &self.on_right, - &partition.bounds, - ); - - // 3. Combine membership and bounds expressions - let then_expr = match (membership_expr, bounds_expr) { - (Some(membership), Some(bounds)) => { - // Both available: combine with AND - Arc::new(BinaryExpr::new( - bounds, - Operator::And, - membership, - )) - as Arc - } - (Some(membership), None) => { - // Membership available but no bounds (e.g., unsupported data types) - membership - } - (None, Some(bounds)) => { - // Bounds available but no membership. - // This should be unreachable in practice: we can always push down a reference - // to the hash table. - // But it seems safer to handle it defensively. - bounds - } - (None, None) => { - // No filter for this partition - should not happen due to filter_map above - // but handle defensively by returning a "true" literal - lit(true) - } - }; - - Ok((when_expr, then_expr)) - }) - .collect::>>()?; - - // Optimize for single partition: skip CASE expression entirely - let filter_expr = if when_then_branches.is_empty() { - // All partitions are empty: no rows can match - lit(false) - } else if when_then_branches.len() == 1 { - // Single partition: just use the condition directly - // since hash % 1 == 0 always, the WHEN 0 branch will always match - Arc::clone(&when_then_branches[0].1) - } else { - // Multiple partitions: create CASE expression - Arc::new(CaseExpr::try_new( - Some(modulo_expr), - when_then_branches, - Some(lit(false)), // ELSE false - )?) as Arc - }; - - self.dynamic_filter.update(filter_expr)?; + .collect::>()?; + + self.dynamic_filter.update_partitioned(per_partition)?; + } else { + // Collect all partition data (should all be Some at this point) + let partition_data: Vec<_> = + partitions.iter().filter_map(|p| p.as_ref()).collect(); + + if !partition_data.is_empty() { + // Build a CASE expression that combines range checks AND membership checks + // CASE (hash_repartition(join_keys) % num_partitions) + // WHEN 0 THEN (col >= min_0 AND col <= max_0 AND ...) AND membership_check_0 + // WHEN 1 THEN (col >= min_1 AND col <= max_1 AND ...) AND membership_check_1 + // ... + // ELSE false + // END + + let num_partitions = partition_data.len(); + + // Create base expression: hash_repartition(join_keys) % num_partitions + let routing_hash_expr = Arc::new(HashExpr::new( + self.on_right.clone(), + self.repartition_random_state.clone(), + "hash_repartition".to_string(), + )) + as Arc; + + let modulo_expr = Arc::new(BinaryExpr::new( + routing_hash_expr, + Operator::Modulo, + lit(ScalarValue::UInt64(Some(num_partitions as u64))), + )) + as Arc; + + // Create WHEN branches for each partition + let when_then_branches: Vec<( + Arc, + Arc, + )> = partitions + .iter() + .enumerate() + .filter_map(|(partition_id, partition_opt)| { + partition_opt.as_ref().and_then(|partition| { + // Skip empty partitions - they would always return false anyway + match &partition.pushdown { + PushdownStrategy::Empty => None, + _ => Some((partition_id, partition)), + } + }) + }) + .map(|(partition_id, partition)| -> Result<_> { + // WHEN partition_id + let when_expr = lit(ScalarValue::UInt64(Some( + partition_id as u64, + ))); + + // THEN: Combine bounds check AND membership predicate + // (should not be None due to filter_map above, but handle defensively) + let then_expr = build_partition_filter_expr( + &self.on_right, + partition, + &self.probe_schema, + )? + .unwrap_or_else(|| lit(true)); + + Ok((when_expr, then_expr)) + }) + .collect::>>()?; + + // Optimize for single partition: skip CASE expression entirely + let filter_expr = if when_then_branches.is_empty() { + // All partitions are empty: no rows can match + lit(false) + } else if when_then_branches.len() == 1 { + // Single partition: just use the condition directly + // since hash % 1 == 0 always, the WHEN 0 branch will always match + Arc::clone(&when_then_branches[0].1) + } else { + // Multiple partitions: create CASE expression + Arc::new(CaseExpr::try_new( + Some(modulo_expr), + when_then_branches, + Some(lit(false)), // ELSE false + )?) + as Arc + }; + + self.dynamic_filter.update(filter_expr)?; + } } } } diff --git a/datafusion/physical-plan/src/joins/mod.rs b/datafusion/physical-plan/src/joins/mod.rs index 2cdfa1e6ac020..8121cc83bc0bd 100644 --- a/datafusion/physical-plan/src/joins/mod.rs +++ b/datafusion/physical-plan/src/joins/mod.rs @@ -21,7 +21,8 @@ use arrow::array::BooleanBufferBuilder; pub use cross_join::CrossJoinExec; use datafusion_physical_expr::PhysicalExprRef; pub use hash_join::{ - HashExpr, HashJoinExec, HashJoinExecBuilder, HashTableLookupExpr, SeededRandomState, + DynamicFilterRoutingMode, HashExpr, HashJoinExec, HashJoinExecBuilder, + HashTableLookupExpr, SeededRandomState, }; pub use nested_loop_join::{NestedLoopJoinExec, NestedLoopJoinExecBuilder}; use parking_lot::Mutex; diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 67c6d5ae1671c..2b339e8a547c8 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1125,6 +1125,11 @@ enum PartitionMode { AUTO = 2; } +enum HashJoinDynamicFilterRoutingMode { + CASE_HASH = 0; + PARTITION_INDEX = 1; +} + message HashJoinExecNode { PhysicalPlanNode left = 1; PhysicalPlanNode right = 2; @@ -1135,6 +1140,7 @@ message HashJoinExecNode { JoinFilter filter = 8; repeated uint32 projection = 9; bool null_aware = 10; + HashJoinDynamicFilterRoutingMode dynamic_filter_routing_mode = 11; } enum StreamPartitionMode { @@ -1443,4 +1449,4 @@ message AsyncFuncExecNode { message BufferExecNode { PhysicalPlanNode input = 1; uint64 capacity = 2; -} \ No newline at end of file +} diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index b77060394feba..b6b3e3d97626b 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -8258,6 +8258,77 @@ impl<'de> serde::Deserialize<'de> for GroupingSetNode { deserializer.deserialize_struct("datafusion.GroupingSetNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for HashJoinDynamicFilterRoutingMode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + let variant = match self { + Self::CaseHash => "CASE_HASH", + Self::PartitionIndex => "PARTITION_INDEX", + }; + serializer.serialize_str(variant) + } +} +impl<'de> serde::Deserialize<'de> for HashJoinDynamicFilterRoutingMode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "CASE_HASH", + "PARTITION_INDEX", + ]; + + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = HashJoinDynamicFilterRoutingMode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + fn visit_i64(self, v: i64) -> std::result::Result + where + E: serde::de::Error, + { + i32::try_from(v) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self) + }) + } + + fn visit_u64(self, v: u64) -> std::result::Result + where + E: serde::de::Error, + { + i32::try_from(v) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self) + }) + } + + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "CASE_HASH" => Ok(HashJoinDynamicFilterRoutingMode::CaseHash), + "PARTITION_INDEX" => Ok(HashJoinDynamicFilterRoutingMode::PartitionIndex), + _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), + } + } + } + deserializer.deserialize_any(GeneratedVisitor) + } +} impl serde::Serialize for HashJoinExecNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -8293,6 +8364,9 @@ impl serde::Serialize for HashJoinExecNode { if self.null_aware { len += 1; } + if self.dynamic_filter_routing_mode != 0 { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.HashJoinExecNode", len)?; if let Some(v) = self.left.as_ref() { struct_ser.serialize_field("left", v)?; @@ -8327,6 +8401,11 @@ impl serde::Serialize for HashJoinExecNode { if self.null_aware { struct_ser.serialize_field("nullAware", &self.null_aware)?; } + if self.dynamic_filter_routing_mode != 0 { + let v = HashJoinDynamicFilterRoutingMode::try_from(self.dynamic_filter_routing_mode) + .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.dynamic_filter_routing_mode)))?; + struct_ser.serialize_field("dynamicFilterRoutingMode", &v)?; + } struct_ser.end() } } @@ -8350,6 +8429,8 @@ impl<'de> serde::Deserialize<'de> for HashJoinExecNode { "projection", "null_aware", "nullAware", + "dynamic_filter_routing_mode", + "dynamicFilterRoutingMode", ]; #[allow(clippy::enum_variant_names)] @@ -8363,6 +8444,7 @@ impl<'de> serde::Deserialize<'de> for HashJoinExecNode { Filter, Projection, NullAware, + DynamicFilterRoutingMode, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -8393,6 +8475,7 @@ impl<'de> serde::Deserialize<'de> for HashJoinExecNode { "filter" => Ok(GeneratedField::Filter), "projection" => Ok(GeneratedField::Projection), "nullAware" | "null_aware" => Ok(GeneratedField::NullAware), + "dynamicFilterRoutingMode" | "dynamic_filter_routing_mode" => Ok(GeneratedField::DynamicFilterRoutingMode), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -8421,6 +8504,7 @@ impl<'de> serde::Deserialize<'de> for HashJoinExecNode { let mut filter__ = None; let mut projection__ = None; let mut null_aware__ = None; + let mut dynamic_filter_routing_mode__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Left => { @@ -8480,6 +8564,12 @@ impl<'de> serde::Deserialize<'de> for HashJoinExecNode { } null_aware__ = Some(map_.next_value()?); } + GeneratedField::DynamicFilterRoutingMode => { + if dynamic_filter_routing_mode__.is_some() { + return Err(serde::de::Error::duplicate_field("dynamicFilterRoutingMode")); + } + dynamic_filter_routing_mode__ = Some(map_.next_value::()? as i32); + } } } Ok(HashJoinExecNode { @@ -8492,6 +8582,7 @@ impl<'de> serde::Deserialize<'de> for HashJoinExecNode { filter: filter__, projection: projection__.unwrap_or_default(), null_aware: null_aware__.unwrap_or_default(), + dynamic_filter_routing_mode: dynamic_filter_routing_mode__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index e95cddcc2c612..cc56cded04c64 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1708,6 +1708,8 @@ pub struct HashJoinExecNode { pub projection: ::prost::alloc::vec::Vec, #[prost(bool, tag = "10")] pub null_aware: bool, + #[prost(enumeration = "HashJoinDynamicFilterRoutingMode", tag = "11")] + pub dynamic_filter_routing_mode: i32, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct SymmetricHashJoinExecNode { @@ -2367,6 +2369,32 @@ impl PartitionMode { } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] +pub enum HashJoinDynamicFilterRoutingMode { + CaseHash = 0, + PartitionIndex = 1, +} +impl HashJoinDynamicFilterRoutingMode { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::CaseHash => "CASE_HASH", + Self::PartitionIndex => "PARTITION_INDEX", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "CASE_HASH" => Some(Self::CaseHash), + "PARTITION_INDEX" => Some(Self::PartitionIndex), + _ => None, + } + } +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] pub enum StreamPartitionMode { SinglePartition = 0, PartitionedExec = 1, diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 60d8b5705bf35..38c3436f3eb8e 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -68,8 +68,9 @@ use datafusion_physical_plan::expressions::PhysicalSortExpr; use datafusion_physical_plan::filter::{FilterExec, FilterExecBuilder}; use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter}; use datafusion_physical_plan::joins::{ - CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec, - StreamJoinPartitionMode, SymmetricHashJoinExec, + CrossJoinExec, DynamicFilterRoutingMode, HashJoinExec, HashJoinExecBuilder, + NestedLoopJoinExec, PartitionMode, SortMergeJoinExec, StreamJoinPartitionMode, + SymmetricHashJoinExec, }; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::memory::LazyMemoryExec; @@ -1355,6 +1356,24 @@ impl protobuf::PhysicalPlanNode { protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned, protobuf::PartitionMode::Auto => PartitionMode::Auto, }; + let dynamic_filter_routing_mode = + protobuf::HashJoinDynamicFilterRoutingMode::try_from( + hashjoin.dynamic_filter_routing_mode, + ) + .map_err(|_| { + proto_error(format!( + "Received a HashJoinNode message with unknown HashJoinDynamicFilterRoutingMode {}", + hashjoin.dynamic_filter_routing_mode + )) + })?; + let dynamic_filter_routing_mode = match dynamic_filter_routing_mode { + protobuf::HashJoinDynamicFilterRoutingMode::CaseHash => { + DynamicFilterRoutingMode::CaseHash + } + protobuf::HashJoinDynamicFilterRoutingMode::PartitionIndex => { + DynamicFilterRoutingMode::PartitionIndex + } + }; let projection = if !hashjoin.projection.is_empty() { Some( hashjoin @@ -1366,17 +1385,16 @@ impl protobuf::PhysicalPlanNode { } else { None }; - Ok(Arc::new(HashJoinExec::try_new( - left, - right, - on, - filter, - &join_type.into(), - projection, - partition_mode, - null_equality.into(), - hashjoin.null_aware, - )?)) + Ok(Arc::new( + HashJoinExecBuilder::new(left, right, on, join_type.into()) + .with_filter(filter) + .with_projection(projection) + .with_partition_mode(partition_mode) + .with_null_equality(null_equality.into()) + .with_null_aware(hashjoin.null_aware) + .with_dynamic_filter_routing_mode(dynamic_filter_routing_mode) + .build()?, + )) } fn try_into_symmetric_hash_join_physical_plan( @@ -2405,6 +2423,14 @@ impl protobuf::PhysicalPlanNode { PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned, PartitionMode::Auto => protobuf::PartitionMode::Auto, }; + let dynamic_filter_routing_mode = match exec.dynamic_filter_routing_mode { + DynamicFilterRoutingMode::CaseHash => { + protobuf::HashJoinDynamicFilterRoutingMode::CaseHash + } + DynamicFilterRoutingMode::PartitionIndex => { + protobuf::HashJoinDynamicFilterRoutingMode::PartitionIndex + } + }; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new( @@ -2420,6 +2446,7 @@ impl protobuf::PhysicalPlanNode { v.iter().map(|x| *x as u32).collect::>() }), null_aware: exec.null_aware, + dynamic_filter_routing_mode: dynamic_filter_routing_mode.into(), }, ))), }) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index e31cdbe0aad23..beedd5ba1632e 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -450,7 +450,7 @@ datafusion.optimizer.max_passes 3 Number of times that the optimizer will attemp datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory -datafusion.optimizer.preserve_file_partitions 0 Minimum number of distinct partition values required to group files by their Hive partition column values (enabling Hash partitioning declaration). How the option is used: - preserve_file_partitions=0: Disable it. - preserve_file_partitions=1: Always enable it. - preserve_file_partitions=N, actual file partitions=M: Only enable when M >= N. This threshold preserves I/O parallelism when file partitioning is below it. Note: This may reduce parallelism, rooting from the I/O level, if the number of distinct partitions is less than the target_partitions. +datafusion.optimizer.preserve_file_partitions 0 Minimum number of distinct partition values required to group files by their Hive partition column values (enabling Hash partitioning declaration). How the option is used: - preserve_file_partitions=0: Disable it. - preserve_file_partitions=1: Always enable it. - preserve_file_partitions=N, actual file partitions=M: Only enable when M >= N. This threshold preserves I/O parallelism when file partitioning is below it. Note: This may reduce parallelism, rooting from the I/O level, if the number of distinct partitions is less than the target_partitions. Note for partitioned hash join dynamic filtering: preserving file partitions can allow partition-index routing (`i -> i`) instead of CASE-hash routing, but this assumes build/probe partition indices stay aligned for dynamic filter consumers. datafusion.optimizer.repartition_aggregations true Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level datafusion.optimizer.repartition_file_min_size 10485760 Minimum total files size in bytes to perform file scan repartitioning. datafusion.optimizer.repartition_file_scans true When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism. This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition). For FileSources, only Parquet and CSV formats are currently supported. If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't happen within a single file. If set to `true` for an in-memory source, all memtable's partitions will have their batches repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change the total number of partitions and batches per partition, but does not slice the initial record tables provided to the MemTable on creation. diff --git a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt index 297094fab16e7..982ddd55e9698 100644 --- a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt +++ b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt @@ -664,8 +664,8 @@ C prod 2017.6 # TEST 12: Partitioned Join with Matching Partition Counts - With Optimization # Both tables have 3 partitions matching target_partitions=3 # No RepartitionExec needed for join - partitions already satisfy the requirement -# Dynamic filter pushdown is disabled in this mode because preserve_file_partitions -# reports Hash partitioning for Hive-style file groups, which are not hash-routed. +# Dynamic filter pushdown uses partition-indexed routing because preserve_file_partitions +# reports Hash partitioning for Hive-style file groups that are not hash-routed. ########## statement ok @@ -692,7 +692,7 @@ physical_plan 04)------ProjectionExec: expr=[value@1 as value, f_dkey@2 as f_dkey, env@0 as env] 05)--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@1, f_dkey@1)], projection=[env@0, value@2, f_dkey@3] 06)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=C/data.parquet]]}, projection=[env, d_dkey], file_type=parquet -07)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet +07)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet, predicate=DynamicFilter [ empty ] query TTR rowsort SELECT f.f_dkey, d.env, sum(f.value) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index aaba453b3541f..2ff1426d99bdb 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -147,7 +147,7 @@ The following configuration settings are available: | datafusion.optimizer.repartition_joins | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level | | datafusion.optimizer.allow_symmetric_joins_without_pruning | true | Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. | | datafusion.optimizer.repartition_file_scans | true | When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism. This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition). For FileSources, only Parquet and CSV formats are currently supported. If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't happen within a single file. If set to `true` for an in-memory source, all memtable's partitions will have their batches repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change the total number of partitions and batches per partition, but does not slice the initial record tables provided to the MemTable on creation. | -| datafusion.optimizer.preserve_file_partitions | 0 | Minimum number of distinct partition values required to group files by their Hive partition column values (enabling Hash partitioning declaration). How the option is used: - preserve_file_partitions=0: Disable it. - preserve_file_partitions=1: Always enable it. - preserve_file_partitions=N, actual file partitions=M: Only enable when M >= N. This threshold preserves I/O parallelism when file partitioning is below it. Note: This may reduce parallelism, rooting from the I/O level, if the number of distinct partitions is less than the target_partitions. | +| datafusion.optimizer.preserve_file_partitions | 0 | Minimum number of distinct partition values required to group files by their Hive partition column values (enabling Hash partitioning declaration). How the option is used: - preserve_file_partitions=0: Disable it. - preserve_file_partitions=1: Always enable it. - preserve_file_partitions=N, actual file partitions=M: Only enable when M >= N. This threshold preserves I/O parallelism when file partitioning is below it. Note: This may reduce parallelism, rooting from the I/O level, if the number of distinct partitions is less than the target_partitions. Note for partitioned hash join dynamic filtering: preserving file partitions can allow partition-index routing (`i -> i`) instead of CASE-hash routing, but this assumes build/probe partition indices stay aligned for dynamic filter consumers. | | datafusion.optimizer.repartition_windows | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level | | datafusion.optimizer.repartition_sorts | true | Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below `text "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` would turn into the plan below which performs better in multithreaded environments `text "SortPreservingMergeExec: [a@0 ASC]", " SortExec: [a@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` | | datafusion.optimizer.subset_repartition_threshold | 4 | Partition count threshold for subset satisfaction optimization. When the current partition count is >= this threshold, DataFusion will skip repartitioning if the required partitioning expression is a subset of the current partition expression such as Hash(a) satisfies Hash(a, b). When the current partition count is < this threshold, DataFusion will repartition to increase parallelism even when subset satisfaction applies. Set to 0 to always repartition (disable subset satisfaction optimization). Set to a high value to always use subset satisfaction. Example (subset_repartition_threshold = 4): `text Hash([a]) satisfies Hash([a, b]) because (Hash([a, b]) is subset of Hash([a]) If current partitions (3) < threshold (4), repartition: AggregateExec: mode=FinalPartitioned, gby=[a, b], aggr=[SUM(x)] RepartitionExec: partitioning=Hash([a, b], 8), input_partitions=3 AggregateExec: mode=Partial, gby=[a, b], aggr=[SUM(x)] DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 3) If current partitions (8) >= threshold (4), use subset satisfaction: AggregateExec: mode=SinglePartitioned, gby=[a, b], aggr=[SUM(x)] DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 8) ` |