-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Gene.bordegaray/2026/02/dyn filter partition indexed #20246
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Gene.bordegaray/2026/02/dyn filter partition indexed #20246
Conversation
NGA-TRAN
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR makes sense. My main comments:
- You may add document about pruning and filtering and why we need partition-index if not yet documented somewhere
- I think there is a typo in test data
- I am unclear whether recursively searching RepartitionExec is the best strategy and whether it will introduce new bug. Maybe some examples and explanation will help.
- I would ask someone that know dynamic filtering well to review this. Adrian and Lia? Maybe they will help explain the recursive walk, too
datafusion/sqllogictest/test_files/preserve_file_partitioning.slt
Outdated
Show resolved
Hide resolved
| /// Determines whether partition-index routing should be used instead of CASE hash routing. | ||
| /// | ||
| /// Partition-index routing is enabled when: | ||
| /// 1. The join is in `Partitioned` mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether adding a comment here saying we do not have problem with CollectLeft with mamy partitions on the probe side is useful or not. It is because there is only one hash table and it will be used for pruning and filtering to all partitions of the probe side
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot to add this, will do 👀
| } | ||
| } | ||
| false | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In wonder if this is the right walk. Should we only check that there is RepartitionExec right before the join? Would we introduce bugs here? Maybe drawing some examples on paper will help you know whether this is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No I don't believe so because there can be many operators between the join and the DataSourceExec. All that matters is seeing if there is some RepartitionExec in between (we then want to use CASE) or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree there can be many operators between the join and the DataSourceExec but it is always the case that if we find any RepartitionExec, it will be for the join? What happens if that repartition is for group-by not the join?
I do not know the details of how dynamic filtering is implemented but this recursive walk worries me. Is there a way to identify that the RepartitionExec is for the join? E.g. it is repartitioned on the join key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if there is a simpler way to know if we are preserving file partitioning, if we are preserving file partitioning I'd say we should store this optimizer decision in the HashJoinExec node instead of recursing through the plan, similar to how we store the PartitionMode in HashJoinExec to make decisions during execution. wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can look into this, but at a furst glance I think this is a great suggestion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the logic in enforce_distribution.rs but it is a bit more involved than the decision using a parititoned hash join. I have added documentation with explanation of cases and the method used
|
|
||
| // Probe side: each partition has matching and non-matching rows. | ||
| // Partition 0: ("aa","ba",10.0) matches p0, ("zz","zz",20.0) does not match p0 | ||
| // Partition 1: ("zz","zz",30.0) matches p1, ("aa","ba",40.0) does not match p1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For your tests, I think these data is ok. However, it does not clear in the context of partitioned hash join. You may want to have data that clearly define partitions for both build and probe sides and make build side smaller and scatter so you can filter data from the probe side
Which issue does this PR close?
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?