Skip to content

Conversation

@LiaCastaneda
Copy link
Contributor

Which issue does this PR close?

Closes #17527

Rationale for this change

Currently, DataFusion computes bounds for all queries that contain a HashJoinExec node whenever the option enable_dynamic_filter_pushdown is set to true (default). It might make sense to compute these bounds only when we explicitly know there is a consumer that will use them.

What changes are included in this PR?

As suggested in #17527 (comment), this PR adds an is_used() method to DynamicFilterPhysicalExpr that checks if any consumers are holding a reference to the filter using Arc::strong_count().

During filter pushdown, consumers that accept the filter and use it later in execution have to retain a reference to Arc. For example, scan nodes like ParquetSource.

Are these changes tested?

I added a unit test in dynamic_filters.rs (test_is_used) that verifies the Arc reference counting behavior.
Existing integration tests in datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs validate the end-to-end behavior. These tests verify that dynamic filters are computed and filled when consumers are present.

Are there any user-facing changes?

new is_used() function

@github-actions github-actions bot added physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate labels Dec 29, 2025
@LiaCastaneda
Copy link
Contributor Author

This is another (desired) alternative to #18938
cc @adriangb this PR implements the is_used approach.

@LiaCastaneda LiaCastaneda marked this pull request as ready for review December 29, 2025 14:14
Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this! The advantages over #19387 are:

  • No API change / breaking changes
  • Less code churn for us and users
  • Complexity is contained within dynamic filters and even there within producers
  • Should work for distributed systems (whatever is broadcasting updates to filters will also need to hold onto a reference to the dynamic filter)

This also means that if we run into issues with this approach it's easy to back out of 😄

Is there any way we can add a test showing that if there are no downstream consumers we don't compute the filters?

@github-actions github-actions bot added the core Core DataFusion crate label Dec 29, 2025
@LiaCastaneda
Copy link
Contributor Author

LiaCastaneda commented Dec 29, 2025

Is there any way we can add a test showing that if there are no downstream consumers we don't compute the filters?

I added test_hashjoin_dynamic_filter_pushdown_not_used that creates a TestScanNode with support == false in the probe node and with enable_dynamic_filter_pushdown enabled so this variable

let enable_dynamic_filter_pushdown = context
            .session_config()
            .options()
            .optimizer
            .enable_join_dynamic_filter_pushdown
            && self
                .dynamic_filter
                .as_ref()
                .map(|df| df.filter.is_used())
                .unwrap_or(false);

should still return false even if the config option is enabled

}

#[tokio::test]
async fn test_hashjoin_dynamic_filter_pushdown_not_used() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the output / result of this test different before / after this PR? Or would the result be the same as long as .with_support(false) // probe side does not support dynamic filtering? I'm okay merging this with only unit tests if it's hard to see the change in action from an integration test, but I think we should be clear about that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmm, you’re right -- it would behave the same if we added this test in main. I overlooked that. to test this end to end I was basically just printing the strong count inside is_used and saw 1 when .with_support(false) and 2 when .with_support(true) ... to test it we would have to make the dynamic_filter param inside the HashJoinExec public and then check is_used after optimization? but I don't think we want to make dynamic_filter public.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree better to keep it private and just rely on the unit test. It's hard to test that a transparent optimization does not kick in...

Copy link
Contributor Author

@LiaCastaneda LiaCastaneda Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created an accessor to dynamic_filter intended to be used in testing only, now it asserts directly with is_used wdyt? or should we just keep the unit tests?

Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe append to the same test the positive case (probe side does support pushdown, is_used is true) just to prove the point? Could even be in a loop to avoid code duplication.

Comment on lines +4639 to +4641
let _consumer = Arc::clone(&dynamic_filter)
.with_new_children(vec![])
.unwrap();
Copy link
Contributor Author

@LiaCastaneda LiaCastaneda Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to add a consumer in these tests, otherwise is_used will return false, no filters will be computed and wait_complete will never return. I will add an is_used check inside wait_complete as well, I can't imagine this ever happenning (unless we call wait_complete on a probe node that does not accept dynamic filters which would be wrong usage) but its worth adding just in case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Only compute bounds/ dynamic filters if consumer asks for it

2 participants