diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index f480de71d6285..d6357fdf6bc7d 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -3512,3 +3512,91 @@ async fn test_hashjoin_hash_table_pushdown_integer_keys() { ", ); } + +#[tokio::test] +async fn test_hashjoin_dynamic_filter_pushdown_is_used() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Test both cases: probe side with and without filter pushdown support + for (probe_supports_pushdown, expected_is_used) in [(false, false), (true, true)] { + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(vec![ + record_batch!(("a", Utf8, ["aa", "ab"]), ("b", Utf8, ["ba", "bb"])) + .unwrap(), + ]) + .build(); + + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(probe_supports_pushdown) + .with_batches(vec![ + record_batch!( + ("a", Utf8, ["aa", "ab", "ac", "ad"]), + ("b", Utf8, ["ba", "bb", "bc", "bd"]) + ) + .unwrap(), + ]) + .build(); + + let on = vec![ + ( + col("a", &build_side_schema).unwrap(), + col("a", &probe_side_schema).unwrap(), + ), + ( + col("b", &build_side_schema).unwrap(), + col("b", &probe_side_schema).unwrap(), + ), + ]; + let plan = Arc::new( + HashJoinExec::try_new( + build_scan, + probe_scan, + on, + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ) as Arc; + + // Apply filter pushdown optimization + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + config.optimizer.enable_dynamic_filter_pushdown = true; + let plan = FilterPushdown::new_post_optimization() + .optimize(plan, &config) + .unwrap(); + + // Get the HashJoinExec to check the dynamic filter + let hash_join = plan + .as_any() + .downcast_ref::() + .expect("Plan should be HashJoinExec"); + + // Verify that a dynamic filter was created + let dynamic_filter = hash_join + .dynamic_filter_for_test() + .expect("Dynamic filter should be created"); + + // Verify that is_used() returns the expected value based on probe side support. + // When probe_supports_pushdown=false: no consumer holds a reference (is_used=false) + // When probe_supports_pushdown=true: probe side holds a reference (is_used=true) + assert_eq!( + dynamic_filter.is_used(), + expected_is_used, + "is_used() should return {expected_is_used} when probe side support is {probe_supports_pushdown}" + ); + } +} diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 615d9cbbf61ac..fd8b2667259f5 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -278,12 +278,17 @@ impl DynamicFilterPhysicalExpr { /// Wait asynchronously until this dynamic filter is marked as complete. /// - /// This method returns immediately if the filter is already complete. + /// This method returns immediately if the filter is already complete or if the filter + /// is not being used by any consumers. /// Otherwise, it waits until [`Self::mark_complete`] is called. /// /// Unlike [`Self::wait_update`], this method guarantees that when it returns, /// the filter is fully complete with no more updates expected. - pub async fn wait_complete(&self) { + pub async fn wait_complete(self: &Arc) { + if !self.is_used() { + return; + } + if self.inner.read().is_complete { return; } @@ -294,6 +299,22 @@ impl DynamicFilterPhysicalExpr { .await; } + /// Check if this dynamic filter is being actively used by any consumers. + /// + /// Returns `true` if there are references beyond the producer (e.g., the HashJoinExec + /// that created the filter). This is useful to avoid computing expensive filter + /// expressions when no consumer will actually use them. + /// + /// Note: We check the inner Arc's strong_count, not the outer Arc's count, because + /// when filters are transformed (e.g., via reassign_expr_columns during filter pushdown), + /// new outer Arc instances are created via with_new_children(), but they all share the + /// same inner `Arc>`. This is what allows filter updates to propagate to + /// consumers even after transformation. + pub fn is_used(self: &Arc) -> bool { + // Strong count > 1 means at least one consumer is holding a reference beyond the producer. + Arc::strong_count(&self.inner) > 1 + } + fn render( &self, f: &mut std::fmt::Formatter<'_>, @@ -691,4 +712,45 @@ mod test { "Expected b + d = [1010, 2020, 3030], got {arr_2:?}", ); } + + #[test] + fn test_is_used() { + let filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![], + lit(true) as Arc, + )); + + // Initially, only one reference to the inner Arc exists + assert!( + !filter.is_used(), + "Filter should not be used with only one inner reference" + ); + + // Simulate a consumer created via transformation (what happens during filter pushdown). + // When filters are pushed down and transformed via reassign_expr_columns/transform_down, + // with_new_children() is called which creates a new outer Arc but clones the inner Arc. + let consumer1_expr = Arc::clone(&filter).with_new_children(vec![]).unwrap(); + let _consumer1 = consumer1_expr + .as_any() + .downcast_ref::() + .expect("Should be DynamicFilterPhysicalExpr"); + + // Now the inner Arc is shared (inner_count = 2) + assert!( + filter.is_used(), + "Filter should be used when inner Arc is shared with transformed consumer" + ); + + // Create another transformed consumer + let consumer2_expr = Arc::clone(&filter).with_new_children(vec![]).unwrap(); + let _consumer2 = consumer2_expr + .as_any() + .downcast_ref::() + .expect("Should be DynamicFilterPhysicalExpr"); + + assert!( + filter.is_used(), + "Filter should still be used with multiple consumers" + ); + } } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index bd92cf496426f..91fc1ee4436ee 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -508,6 +508,17 @@ impl HashJoinExec { self.null_equality } + /// Get the dynamic filter expression for testing purposes. + /// Returns `None` if no dynamic filter has been set. + /// + /// This method is intended for testing only and should not be used in production code. + #[doc(hidden)] + pub fn dynamic_filter_for_test(&self) -> Option> { + self.dynamic_filter + .as_ref() + .map(|df| Arc::clone(&df.filter)) + } + /// Calculate order preservation flags for this hash join. fn maintains_input_order(join_type: JoinType) -> Vec { vec![ @@ -921,7 +932,21 @@ impl ExecutionPlan for HashJoinExec { consider using CoalescePartitionsExec or the EnforceDistribution rule" ); - let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some(); + // Only enable dynamic filter pushdown if: + // - The session config enables dynamic filter pushdown + // - A dynamic filter exists + // - At least one consumer is holding a reference to it, this avoids expensive filter + // computation when disabled or when no consumer will use it. + let enable_dynamic_filter_pushdown = context + .session_config() + .options() + .optimizer + .enable_join_dynamic_filter_pushdown + && self + .dynamic_filter + .as_ref() + .map(|df| df.filter.is_used()) + .unwrap_or(false); let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); let left_fut = match self.mode { @@ -4610,6 +4635,11 @@ mod tests { let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); let dynamic_filter_clone = Arc::clone(&dynamic_filter); + // Simulate a consumer by creating a transformed copy (what happens during filter pushdown) + let _consumer = Arc::clone(&dynamic_filter) + .with_new_children(vec![]) + .unwrap(); + // Create HashJoinExec with the dynamic filter let mut join = HashJoinExec::try_new( left, @@ -4658,6 +4688,11 @@ mod tests { let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); let dynamic_filter_clone = Arc::clone(&dynamic_filter); + // Simulate a consumer by creating a transformed copy (what happens during filter pushdown) + let _consumer = Arc::clone(&dynamic_filter) + .with_new_children(vec![]) + .unwrap(); + // Create HashJoinExec with the dynamic filter let mut join = HashJoinExec::try_new( left,