-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Compute Dynamic Filters only when a consumer supports them #19546
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Compute Dynamic Filters only when a consumer supports them #19546
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this! The advantages over #19387 are:
- No API change / breaking changes
- Less code churn for us and users
- Complexity is contained within dynamic filters and even there within producers
- Should work for distributed systems (whatever is broadcasting updates to filters will also need to hold onto a reference to the dynamic filter)
This also means that if we run into issues with this approach it's easy to back out of 😄
Is there any way we can add a test showing that if there are no downstream consumers we don't compute the filters?
I added should still return |
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_hashjoin_dynamic_filter_pushdown_not_used() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the output / result of this test different before / after this PR? Or would the result be the same as long as .with_support(false) // probe side does not support dynamic filtering? I'm okay merging this with only unit tests if it's hard to see the change in action from an integration test, but I think we should be clear about that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mmm, you’re right -- it would behave the same if we added this test in main. I overlooked that. to test this end to end I was basically just printing the strong count inside is_used and saw 1 when .with_support(false) and 2 when .with_support(true) ... to test it we would have to make the dynamic_filter param inside the HashJoinExec public and then check is_used after optimization? but I don't think we want to make dynamic_filter public.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree better to keep it private and just rely on the unit test. It's hard to test that a transparent optimization does not kick in...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created an accessor to dynamic_filter intended to be used in testing only, now it asserts directly with is_used wdyt? or should we just keep the unit tests?
adriangb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe append to the same test the positive case (probe side does support pushdown, is_used is true) just to prove the point? Could even be in a loop to avoid code duplication.
| let _consumer = Arc::clone(&dynamic_filter) | ||
| .with_new_children(vec![]) | ||
| .unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to add a consumer in these tests, otherwise is_used will return false, no filters will be computed and wait_complete will never return. I will add an is_used check inside wait_complete as well, I can't imagine this ever happenning (unless we call wait_complete on a probe node that does not accept dynamic filters which would be wrong usage) but its worth adding just in case.
Which issue does this PR close?
Closes #17527
Rationale for this change
Currently, DataFusion computes bounds for all queries that contain a HashJoinExec node whenever the option enable_dynamic_filter_pushdown is set to true (default). It might make sense to compute these bounds only when we explicitly know there is a consumer that will use them.
What changes are included in this PR?
As suggested in #17527 (comment), this PR adds an is_used() method to DynamicFilterPhysicalExpr that checks if any consumers are holding a reference to the filter using Arc::strong_count().
During filter pushdown, consumers that accept the filter and use it later in execution have to retain a reference to Arc. For example, scan nodes like ParquetSource.
Are these changes tested?
I added a unit test in dynamic_filters.rs (test_is_used) that verifies the Arc reference counting behavior.
Existing integration tests in datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs validate the end-to-end behavior. These tests verify that dynamic filters are computed and filled when consumers are present.
Are there any user-facing changes?
new is_used() function