From e116a9e2861a7330ff87e7ef59891ccc0a68c422 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Wed, 26 Nov 2025 11:12:07 +0100 Subject: [PATCH 01/11] Compute Dynamic Filters only when a consumer supports them --- .../src/expressions/dynamic_filters.rs | 52 +++++++++++++++++++ .../physical-plan/src/joins/hash_join/exec.rs | 16 +++++- 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 615d9cbbf61ac..8706dbd756357 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -294,6 +294,16 @@ impl DynamicFilterPhysicalExpr { .await; } + /// Check if this dynamic filter is being actively used by any consumers. + /// + /// Returns `true` if there are references beyond the producer (e.g., the HashJoinExec + /// that created the filter). This is useful to avoid computing expensive filter + /// expressions when no consumer will actually use them. + pub fn is_used(self: &Arc) -> bool { + // Strong count > 1 means at least one consumer is holding a reference beyond the producer. + Arc::strong_count(self) > 1 + } + fn render( &self, f: &mut std::fmt::Formatter<'_>, @@ -691,4 +701,46 @@ mod test { "Expected b + d = [1010, 2020, 3030], got {arr_2:?}", ); } + + #[test] + fn test_is_used() { + let filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![], + lit(true) as Arc, + )); + + // Initially, only one reference exists (the filter itself) + assert!( + !filter.is_used(), + "Filter should not be used with only one reference" + ); + + // Simulate a consumer holding a reference (e.g., ParquetExec) + let consumer1 = Arc::clone(&filter); + assert!( + filter.is_used(), + "Filter should be used with a consumer reference" + ); + + // Multiple consumers + let consumer2 = Arc::clone(&filter); + assert!( + filter.is_used(), + "Filter should still be used with multiple consumers" + ); + + // Drop one consumer + drop(consumer1); + assert!( + filter.is_used(), + "Filter should still be used with remaining consumer" + ); + + // Drop all consumers + drop(consumer2); + assert!( + !filter.is_used(), + "Filter should not be used after all consumers dropped" + ); + } } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index bd92cf496426f..39a8d51fa8648 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -921,7 +921,21 @@ impl ExecutionPlan for HashJoinExec { consider using CoalescePartitionsExec or the EnforceDistribution rule" ); - let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some(); + // Only enable dynamic filter pushdown if: + // - The session config enables dynamic filter pushdown + // - A dynamic filter exists + // - At least one consumer is holding a reference to it, this avoids expensive filter + // computation when disabled or when no consumer will use it. + let enable_dynamic_filter_pushdown = context + .session_config() + .options() + .optimizer + .enable_join_dynamic_filter_pushdown + && self + .dynamic_filter + .as_ref() + .map(|df| df.filter.is_used()) + .unwrap_or(false); let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); let left_fut = match self.mode { From e6130c8a5a17ac1377f9fd2b3bba4f32f1f6f168 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Mon, 29 Dec 2025 14:14:46 +0100 Subject: [PATCH 02/11] use strong_count of inner struct instead --- .../physical-expr/src/expressions/dynamic_filters.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 8706dbd756357..e086869128538 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -299,9 +299,14 @@ impl DynamicFilterPhysicalExpr { /// Returns `true` if there are references beyond the producer (e.g., the HashJoinExec /// that created the filter). This is useful to avoid computing expensive filter /// expressions when no consumer will actually use them. + /// + /// Note: We check the inner Arc's strong_count, not the outer Arc's count, because + /// when filters are transformed, new outer Arc instances + /// are created via with_new_children(), but they all share the same inner Arc>. + /// This is what allows filter updates to propagate to consumers even after transformation. pub fn is_used(self: &Arc) -> bool { // Strong count > 1 means at least one consumer is holding a reference beyond the producer. - Arc::strong_count(self) > 1 + Arc::strong_count(&self.inner) > 1 } fn render( From 1097b7dd8d0298ef0c2e562a48a2e258cf2cd0a3 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Mon, 29 Dec 2025 15:10:05 +0100 Subject: [PATCH 03/11] Adjust unit test --- .../src/expressions/dynamic_filters.rs | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index e086869128538..a89acf53d87e7 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -301,9 +301,10 @@ impl DynamicFilterPhysicalExpr { /// expressions when no consumer will actually use them. /// /// Note: We check the inner Arc's strong_count, not the outer Arc's count, because - /// when filters are transformed, new outer Arc instances - /// are created via with_new_children(), but they all share the same inner Arc>. - /// This is what allows filter updates to propagate to consumers even after transformation. + /// when filters are transformed (e.g., via reassign_expr_columns during filter pushdown), + /// new outer Arc instances are created via with_new_children(), but they all share the + /// same inner Arc>. This is what allows filter updates to propagate to + /// consumers even after transformation. pub fn is_used(self: &Arc) -> bool { // Strong count > 1 means at least one consumer is holding a reference beyond the producer. Arc::strong_count(&self.inner) > 1 @@ -714,38 +715,37 @@ mod test { lit(true) as Arc, )); - // Initially, only one reference exists (the filter itself) + // Initially, only one reference to the inner Arc exists assert!( !filter.is_used(), - "Filter should not be used with only one reference" + "Filter should not be used with only one inner reference" ); - // Simulate a consumer holding a reference (e.g., ParquetExec) - let consumer1 = Arc::clone(&filter); - assert!( - filter.is_used(), - "Filter should be used with a consumer reference" - ); + // Simulate a consumer created via transformation (what happens during filter pushdown). + // When filters are pushed down and transformed via reassign_expr_columns/transform_down, + // with_new_children() is called which creates a new outer Arc but clones the inner Arc. + let consumer1_expr = Arc::clone(&filter).with_new_children(vec![]).unwrap(); + let _consumer1 = consumer1_expr + .as_any() + .downcast_ref::() + .expect("Should be DynamicFilterPhysicalExpr"); - // Multiple consumers - let consumer2 = Arc::clone(&filter); + // Now the inner Arc is shared (inner_count = 2) assert!( filter.is_used(), - "Filter should still be used with multiple consumers" + "Filter should be used when inner Arc is shared with transformed consumer" ); - // Drop one consumer - drop(consumer1); - assert!( - filter.is_used(), - "Filter should still be used with remaining consumer" - ); + // Create another transformed consumer + let consumer2_expr = Arc::clone(&filter).with_new_children(vec![]).unwrap(); + let _consumer2 = consumer2_expr + .as_any() + .downcast_ref::() + .expect("Should be DynamicFilterPhysicalExpr"); - // Drop all consumers - drop(consumer2); assert!( - !filter.is_used(), - "Filter should not be used after all consumers dropped" + filter.is_used(), + "Filter should still be used with multiple consumers" ); } } From 061d2b1edea9fb2511a61e52bf8f2adccb55e64e Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Mon, 29 Dec 2025 16:18:53 +0100 Subject: [PATCH 04/11] add integration test --- .../physical_optimizer/filter_pushdown/mod.rs | 94 +++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index f480de71d6285..a9b64e3baa759 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -3512,3 +3512,97 @@ async fn test_hashjoin_hash_table_pushdown_integer_keys() { ", ); } + +#[tokio::test] +async fn test_hashjoin_dynamic_filter_pushdown_not_used() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Configure session with dynamic filter pushdown enabled + let session_config = SessionConfig::default() + .with_batch_size(10) + .set_bool("datafusion.execution.parquet.pushdown_filters", true) + .set_bool("datafusion.optimizer.enable_dynamic_filter_pushdown", true); + + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(vec![ + record_batch!(("a", Utf8, ["aa", "ab"]), ("b", Utf8, ["ba", "bb"])).unwrap(), + ]) + .build(); + + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(false) // probe side does not support dynamic filtering + .with_batches(vec![ + record_batch!( + ("a", Utf8, ["aa", "ab", "ac", "ad"]), + ("b", Utf8, ["ba", "bb", "bc", "bd"]) + ) + .unwrap(), + ]) + .build(); + + let on = vec![ + ( + col("a", &build_side_schema).unwrap(), + col("a", &probe_side_schema).unwrap(), + ), + ( + col("b", &build_side_schema).unwrap(), + col("b", &probe_side_schema).unwrap(), + ), + ]; + let plan = Arc::new( + HashJoinExec::try_new( + build_scan, + probe_scan, + on, + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ) as Arc; + + // Apply filter pushdown optimization + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + config.optimizer.enable_dynamic_filter_pushdown = true; + let plan = FilterPushdown::new_post_optimization() + .optimize(plan, &config) + .unwrap(); + + // Execute the plan to trigger is_used() check + let session_ctx = SessionContext::new_with_config(session_config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let _batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx)) + .await + .unwrap(); + + // After execution, the dynamic filter should remain empty because is_used() returns false. + // Even though dynamic filter pushdown is enabled, the filter is not populated because + // the probe side doesn't support it (no consumer holds a reference to the inner Arc). + insta::assert_snapshot!( + format!("{}", format_plan_for_test(&plan)), + @r" + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=false + " + ); +} From c33c073ea72ecfb4583924c140deb3065988c0ab Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Mon, 29 Dec 2025 16:28:46 +0100 Subject: [PATCH 05/11] Fix cargo doc --- datafusion/physical-expr/src/expressions/dynamic_filters.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index a89acf53d87e7..d9c2b1b6b9108 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -303,7 +303,7 @@ impl DynamicFilterPhysicalExpr { /// Note: We check the inner Arc's strong_count, not the outer Arc's count, because /// when filters are transformed (e.g., via reassign_expr_columns during filter pushdown), /// new outer Arc instances are created via with_new_children(), but they all share the - /// same inner Arc>. This is what allows filter updates to propagate to + /// same inner `Arc>`. This is what allows filter updates to propagate to /// consumers even after transformation. pub fn is_used(self: &Arc) -> bool { // Strong count > 1 means at least one consumer is holding a reference beyond the producer. From bb1db1295e191b11974f97d9a8f80a3f06c26de0 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Mon, 29 Dec 2025 17:03:56 +0100 Subject: [PATCH 06/11] Change testing approach --- .../physical_optimizer/filter_pushdown/mod.rs | 41 +++++++------------ .../physical-plan/src/joins/hash_join/exec.rs | 11 +++++ 2 files changed, 25 insertions(+), 27 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index a9b64e3baa759..600d1fa392a2d 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -3518,12 +3518,6 @@ async fn test_hashjoin_dynamic_filter_pushdown_not_used() { use datafusion_common::JoinType; use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; - // Configure session with dynamic filter pushdown enabled - let session_config = SessionConfig::default() - .with_batch_size(10) - .set_bool("datafusion.execution.parquet.pushdown_filters", true) - .set_bool("datafusion.optimizer.enable_dynamic_filter_pushdown", true); - let build_side_schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Utf8, false), Field::new("b", DataType::Utf8, false), @@ -3582,27 +3576,20 @@ async fn test_hashjoin_dynamic_filter_pushdown_not_used() { .optimize(plan, &config) .unwrap(); - // Execute the plan to trigger is_used() check - let session_ctx = SessionContext::new_with_config(session_config); - session_ctx.register_object_store( - ObjectStoreUrl::parse("test://").unwrap().as_ref(), - Arc::new(InMemory::new()), - ); - let state = session_ctx.state(); - let task_ctx = state.task_ctx(); - let _batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx)) - .await - .unwrap(); + // Get the HashJoinExec to check the dynamic filter + let hash_join = plan + .as_any() + .downcast_ref::() + .expect("Plan should be HashJoinExec"); - // After execution, the dynamic filter should remain empty because is_used() returns false. - // Even though dynamic filter pushdown is enabled, the filter is not populated because - // the probe side doesn't support it (no consumer holds a reference to the inner Arc). - insta::assert_snapshot!( - format!("{}", format_plan_for_test(&plan)), - @r" - - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=false - " + // Verify that a dynamic filter was created + let dynamic_filter = hash_join + .dynamic_filter_for_test() + .expect("Dynamic filter should be created"); + + // Verify that is_used() returns false because probe side doesn't support filtering + assert!( + !dynamic_filter.is_used(), + "is_used() should return false when probe side doesn't support dynamic filtering" ); } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 39a8d51fa8648..4d5f059002b43 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -508,6 +508,17 @@ impl HashJoinExec { self.null_equality } + /// Get the dynamic filter expression for testing purposes. + /// Returns `None` if no dynamic filter has been set. + /// + /// This method is intended for testing only and should not be used in production code. + #[doc(hidden)] + pub fn dynamic_filter_for_test(&self) -> Option> { + self.dynamic_filter + .as_ref() + .map(|df| Arc::clone(&df.filter)) + } + /// Calculate order preservation flags for this hash join. fn maintains_input_order(join_type: JoinType) -> Vec { vec![ From 392b7c94c3c992f2c83fcbf5622e6d47672ba631 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Mon, 29 Dec 2025 17:13:37 +0100 Subject: [PATCH 07/11] test when is_used should be true and false --- .../physical_optimizer/filter_pushdown/mod.rs | 149 ++++++++++-------- 1 file changed, 79 insertions(+), 70 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 600d1fa392a2d..672e5fa36e0fc 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -3518,78 +3518,87 @@ async fn test_hashjoin_dynamic_filter_pushdown_not_used() { use datafusion_common::JoinType; use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; - let build_side_schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Utf8, false), - Field::new("b", DataType::Utf8, false), - ])); - let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) - .with_support(true) - .with_batches(vec![ - record_batch!(("a", Utf8, ["aa", "ab"]), ("b", Utf8, ["ba", "bb"])).unwrap(), - ]) - .build(); + // Test both cases: probe side with and without filter pushdown support + for (probe_supports_pushdown, expected_is_used) in [(false, false), (true, true)] { + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(vec![ + record_batch!(("a", Utf8, ["aa", "ab"]), ("b", Utf8, ["ba", "bb"])) + .unwrap(), + ]) + .build(); - let probe_side_schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Utf8, false), - Field::new("b", DataType::Utf8, false), - ])); - let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) - .with_support(false) // probe side does not support dynamic filtering - .with_batches(vec![ - record_batch!( - ("a", Utf8, ["aa", "ab", "ac", "ad"]), - ("b", Utf8, ["ba", "bb", "bc", "bd"]) + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(probe_supports_pushdown) + .with_batches(vec![ + record_batch!( + ("a", Utf8, ["aa", "ab", "ac", "ad"]), + ("b", Utf8, ["ba", "bb", "bc", "bd"]) + ) + .unwrap(), + ]) + .build(); + + let on = vec![ + ( + col("a", &build_side_schema).unwrap(), + col("a", &probe_side_schema).unwrap(), + ), + ( + col("b", &build_side_schema).unwrap(), + col("b", &probe_side_schema).unwrap(), + ), + ]; + let plan = Arc::new( + HashJoinExec::try_new( + build_scan, + probe_scan, + on, + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + datafusion_common::NullEquality::NullEqualsNothing, ) .unwrap(), - ]) - .build(); - - let on = vec![ - ( - col("a", &build_side_schema).unwrap(), - col("a", &probe_side_schema).unwrap(), - ), - ( - col("b", &build_side_schema).unwrap(), - col("b", &probe_side_schema).unwrap(), - ), - ]; - let plan = Arc::new( - HashJoinExec::try_new( - build_scan, - probe_scan, - on, - None, - &JoinType::Inner, - None, - PartitionMode::CollectLeft, - datafusion_common::NullEquality::NullEqualsNothing, - ) - .unwrap(), - ) as Arc; - - // Apply filter pushdown optimization - let mut config = ConfigOptions::default(); - config.execution.parquet.pushdown_filters = true; - config.optimizer.enable_dynamic_filter_pushdown = true; - let plan = FilterPushdown::new_post_optimization() - .optimize(plan, &config) - .unwrap(); - - // Get the HashJoinExec to check the dynamic filter - let hash_join = plan - .as_any() - .downcast_ref::() - .expect("Plan should be HashJoinExec"); - - // Verify that a dynamic filter was created - let dynamic_filter = hash_join - .dynamic_filter_for_test() - .expect("Dynamic filter should be created"); + ) as Arc; + + // Apply filter pushdown optimization + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + config.optimizer.enable_dynamic_filter_pushdown = true; + let plan = FilterPushdown::new_post_optimization() + .optimize(plan, &config) + .unwrap(); - // Verify that is_used() returns false because probe side doesn't support filtering - assert!( - !dynamic_filter.is_used(), - "is_used() should return false when probe side doesn't support dynamic filtering" - ); + // Get the HashJoinExec to check the dynamic filter + let hash_join = plan + .as_any() + .downcast_ref::() + .expect("Plan should be HashJoinExec"); + + // Verify that a dynamic filter was created + let dynamic_filter = hash_join + .dynamic_filter_for_test() + .expect("Dynamic filter should be created"); + + // Verify that is_used() returns the expected value based on probe side support. + // When probe_supports_pushdown=false: no consumer holds a reference (is_used=false) + // When probe_supports_pushdown=true: probe side holds a reference (is_used=true) + assert_eq!( + dynamic_filter.is_used(), + expected_is_used, + "is_used() should return {} when probe side support is {}", + expected_is_used, + probe_supports_pushdown + ); + } } From 0ef0893ae4331d64b7a5e28afc93d564c056241a Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Mon, 29 Dec 2025 17:25:40 +0100 Subject: [PATCH 08/11] clippy --- .../core/tests/physical_optimizer/filter_pushdown/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 672e5fa36e0fc..d93f702fa98d5 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -3596,9 +3596,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_not_used() { assert_eq!( dynamic_filter.is_used(), expected_is_used, - "is_used() should return {} when probe side support is {}", - expected_is_used, - probe_supports_pushdown + "is_used() should return {expected_is_used} when probe side support is {probe_supports_pushdown}" ); } } From 95581d339233758d5fcf92de8c170260ee94035e Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Mon, 29 Dec 2025 17:57:50 +0100 Subject: [PATCH 09/11] Adjust test in exec.rs --- datafusion/physical-plan/src/joins/hash_join/exec.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 4d5f059002b43..91fc1ee4436ee 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -4635,6 +4635,11 @@ mod tests { let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); let dynamic_filter_clone = Arc::clone(&dynamic_filter); + // Simulate a consumer by creating a transformed copy (what happens during filter pushdown) + let _consumer = Arc::clone(&dynamic_filter) + .with_new_children(vec![]) + .unwrap(); + // Create HashJoinExec with the dynamic filter let mut join = HashJoinExec::try_new( left, @@ -4683,6 +4688,11 @@ mod tests { let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); let dynamic_filter_clone = Arc::clone(&dynamic_filter); + // Simulate a consumer by creating a transformed copy (what happens during filter pushdown) + let _consumer = Arc::clone(&dynamic_filter) + .with_new_children(vec![]) + .unwrap(); + // Create HashJoinExec with the dynamic filter let mut join = HashJoinExec::try_new( left, From a035dd1fd8c3518b642bd34abcc88d4f5477fb0f Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Mon, 29 Dec 2025 18:16:48 +0100 Subject: [PATCH 10/11] Add is_used check inside wait_complete --- .../physical-expr/src/expressions/dynamic_filters.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index d9c2b1b6b9108..fd8b2667259f5 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -278,12 +278,17 @@ impl DynamicFilterPhysicalExpr { /// Wait asynchronously until this dynamic filter is marked as complete. /// - /// This method returns immediately if the filter is already complete. + /// This method returns immediately if the filter is already complete or if the filter + /// is not being used by any consumers. /// Otherwise, it waits until [`Self::mark_complete`] is called. /// /// Unlike [`Self::wait_update`], this method guarantees that when it returns, /// the filter is fully complete with no more updates expected. - pub async fn wait_complete(&self) { + pub async fn wait_complete(self: &Arc) { + if !self.is_used() { + return; + } + if self.inner.read().is_complete { return; } From c1712bff25b405530292fe3bd3fb692b3c0726d2 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Mon, 29 Dec 2025 18:20:21 +0100 Subject: [PATCH 11/11] test name --- datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index d93f702fa98d5..d6357fdf6bc7d 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -3514,7 +3514,7 @@ async fn test_hashjoin_hash_table_pushdown_integer_keys() { } #[tokio::test] -async fn test_hashjoin_dynamic_filter_pushdown_not_used() { +async fn test_hashjoin_dynamic_filter_pushdown_is_used() { use datafusion_common::JoinType; use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};