-
Notifications
You must be signed in to change notification settings - Fork 1.9k
feat: Push limit into hash join #20228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
jonathanc-n
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
run benchmarks |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark tpcds |
|
🤖 |
|
(not sure if there is any query benefiting from this in tpch / tpcds, but those contain joins at least) |
|
run benchmark tpch |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
2010YOUY01
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! The implementation looks very straight-forward to me.
I suggest constructing a query that favors this optimization, and ensure the performance improvement can be measured. I don't think there will be any surprises, just to be safe.
| self.fetch | ||
| } | ||
|
|
||
| fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this method really needed ?
If it is not then I'd suggest to remove it.
If the HashJoinExec is already executed then setting a new limit will be confusing/inconsistent unless it is re-executed again.
Alternatively it could be implemented as:
HashJoinExecBuilder::from(self)
.with_fetch(limit)
.build()
.ok()
.map(|exec| Arc::new(exec) as _)This way it won't keep the calculated state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is needed, the optimizer uses it to pass the limit to the exec node. with_fetch() will only be called during compile time for limit pushdown so the current solution is fine. I will change it to use HashJoinExec builder
Co-authored-by: Yongting You <2010youy01@gmail.com>
…/datafusion into push-limit-into-join
2010YOUY01
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've profiled it on a target workload, and verified the expected improvement 🚀
PR:
> select *
from
generate_series(0, 100000000, 8192) as t_build(v1)
inner join
generate_series(100000000) as t_probe(v2)
on t_build.v1 = t_probe.v2
limit 5;
+--------+--------+
| v1 | v2 |
+--------+--------+
| 49152 | 49152 |
| 57344 | 57344 |
| 98304 | 98304 |
| 131072 | 131072 |
| 253952 | 253952 |
+--------+--------+
5 row(s) fetched.
Elapsed 0.009 seconds.
main:
> select *
from
generate_series(0, 100000000, 8192) as t_build(v1)
inner join
generate_series(100000000) as t_probe(v2)
on t_build.v1 = t_probe.v2
limit 5;
+--------+--------+
| v1 | v2 |
+--------+--------+
| 8192 | 8192 |
| 221184 | 221184 |
| 237568 | 237568 |
| 319488 | 319488 |
| 376832 | 376832 |
+--------+--------+
5 row(s) fetched.
Elapsed 0.471 seconds.
| } | ||
|
|
||
| fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> { | ||
| HashJoinExecBuilder::from(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get the purpose of this builder, and why do we have to do a round trip to set this fetch.
Also, the comment on HashJoinExecBuilder lacks further explanation. This API feels quite subtle: it resets several runtime fields (for example, the dynamic filter), which can be surprising. I would expect more documentation explaining the intended usage, invariants, and any safety considerations when using it.
Could we do directly clone like
Self {
fetch: limit,
// ...explicitly reset not cloneable runtime contents
..self.clone()
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(This is my major concern, once clarified, I think this PR is good to go)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@2010YOUY01 See my concern at #20228 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there should be some docs for it. Filed in #20270
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to make those resets explicit and avoid builders with hidden semantics.
However this pattern has already propagated in the codebase, so I'd like to continue the discussion in the original PR and see whether we can clean this up or remove such patterns over time: #19893 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Builder was added to have an ability to build HashJoinExec using:
projection: Option<Vec<usize>>(not break existingtry_newAPI)- shared projection
Option<ProjectionRef>(to share projection if exec is created from another one and the projection is preserved).
The reason to not use impl Option<Into<ProjectionRef>> as try_new arg here: it requires to specify None type explicitly when pass None. The reason to not add an another try_new_with_shared_projection: it looks confusing as we don't want to create a new constructor each time when some arg need to be either one thing or another.
If we keep the builder, to avoid hidden semantics as much as possible, it should keep all existing fields when constructed from the existing exec, allowing customizing things that must be rest. I will make a patch for it.
Which issue does this PR close?
Rationale for this change
What changes are included in this PR?
Push limit down into hash join using limit pushdown optimizer. Use limit pushdown optimizer to pass the limit value to Hash Join exec using
with_fetchand passing thefetchvalue toLimitedBatch Coalescerto emit the batch once the limit is hit.Are these changes tested?
SLT tests + unit tests