Skip to content
88 changes: 88 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3512,3 +3512,91 @@ async fn test_hashjoin_hash_table_pushdown_integer_keys() {
",
);
}

#[tokio::test]
async fn test_hashjoin_dynamic_filter_pushdown_is_used() {
use datafusion_common::JoinType;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};

// Test both cases: probe side with and without filter pushdown support
for (probe_supports_pushdown, expected_is_used) in [(false, false), (true, true)] {
let build_side_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
]));
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
.with_support(true)
.with_batches(vec![
record_batch!(("a", Utf8, ["aa", "ab"]), ("b", Utf8, ["ba", "bb"]))
.unwrap(),
])
.build();

let probe_side_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
]));
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
.with_support(probe_supports_pushdown)
.with_batches(vec![
record_batch!(
("a", Utf8, ["aa", "ab", "ac", "ad"]),
("b", Utf8, ["ba", "bb", "bc", "bd"])
)
.unwrap(),
])
.build();

let on = vec![
(
col("a", &build_side_schema).unwrap(),
col("a", &probe_side_schema).unwrap(),
),
(
col("b", &build_side_schema).unwrap(),
col("b", &probe_side_schema).unwrap(),
),
];
let plan = Arc::new(
HashJoinExec::try_new(
build_scan,
probe_scan,
on,
None,
&JoinType::Inner,
None,
PartitionMode::CollectLeft,
datafusion_common::NullEquality::NullEqualsNothing,
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;

// Apply filter pushdown optimization
let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
config.optimizer.enable_dynamic_filter_pushdown = true;
let plan = FilterPushdown::new_post_optimization()
.optimize(plan, &config)
.unwrap();

// Get the HashJoinExec to check the dynamic filter
let hash_join = plan
.as_any()
.downcast_ref::<HashJoinExec>()
.expect("Plan should be HashJoinExec");

// Verify that a dynamic filter was created
let dynamic_filter = hash_join
.dynamic_filter_for_test()
.expect("Dynamic filter should be created");

// Verify that is_used() returns the expected value based on probe side support.
// When probe_supports_pushdown=false: no consumer holds a reference (is_used=false)
// When probe_supports_pushdown=true: probe side holds a reference (is_used=true)
assert_eq!(
dynamic_filter.is_used(),
expected_is_used,
"is_used() should return {expected_is_used} when probe side support is {probe_supports_pushdown}"
);
}
}
66 changes: 64 additions & 2 deletions datafusion/physical-expr/src/expressions/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,17 @@ impl DynamicFilterPhysicalExpr {

/// Wait asynchronously until this dynamic filter is marked as complete.
///
/// This method returns immediately if the filter is already complete.
/// This method returns immediately if the filter is already complete or if the filter
/// is not being used by any consumers.
/// Otherwise, it waits until [`Self::mark_complete`] is called.
///
/// Unlike [`Self::wait_update`], this method guarantees that when it returns,
/// the filter is fully complete with no more updates expected.
pub async fn wait_complete(&self) {
pub async fn wait_complete(self: &Arc<Self>) {
if !self.is_used() {
return;
}

if self.inner.read().is_complete {
return;
}
Expand All @@ -294,6 +299,22 @@ impl DynamicFilterPhysicalExpr {
.await;
}

/// Check if this dynamic filter is being actively used by any consumers.
///
/// Returns `true` if there are references beyond the producer (e.g., the HashJoinExec
/// that created the filter). This is useful to avoid computing expensive filter
/// expressions when no consumer will actually use them.
///
/// Note: We check the inner Arc's strong_count, not the outer Arc's count, because
/// when filters are transformed (e.g., via reassign_expr_columns during filter pushdown),
/// new outer Arc instances are created via with_new_children(), but they all share the
/// same inner `Arc<RwLock<Inner>>`. This is what allows filter updates to propagate to
/// consumers even after transformation.
pub fn is_used(self: &Arc<Self>) -> bool {
// Strong count > 1 means at least one consumer is holding a reference beyond the producer.
Arc::strong_count(&self.inner) > 1
}

fn render(
&self,
f: &mut std::fmt::Formatter<'_>,
Expand Down Expand Up @@ -691,4 +712,45 @@ mod test {
"Expected b + d = [1010, 2020, 3030], got {arr_2:?}",
);
}

#[test]
fn test_is_used() {
let filter = Arc::new(DynamicFilterPhysicalExpr::new(
vec![],
lit(true) as Arc<dyn PhysicalExpr>,
));

// Initially, only one reference to the inner Arc exists
assert!(
!filter.is_used(),
"Filter should not be used with only one inner reference"
);

// Simulate a consumer created via transformation (what happens during filter pushdown).
// When filters are pushed down and transformed via reassign_expr_columns/transform_down,
// with_new_children() is called which creates a new outer Arc but clones the inner Arc.
let consumer1_expr = Arc::clone(&filter).with_new_children(vec![]).unwrap();
let _consumer1 = consumer1_expr
.as_any()
.downcast_ref::<DynamicFilterPhysicalExpr>()
.expect("Should be DynamicFilterPhysicalExpr");

// Now the inner Arc is shared (inner_count = 2)
assert!(
filter.is_used(),
"Filter should be used when inner Arc is shared with transformed consumer"
);

// Create another transformed consumer
let consumer2_expr = Arc::clone(&filter).with_new_children(vec![]).unwrap();
let _consumer2 = consumer2_expr
.as_any()
.downcast_ref::<DynamicFilterPhysicalExpr>()
.expect("Should be DynamicFilterPhysicalExpr");

assert!(
filter.is_used(),
"Filter should still be used with multiple consumers"
);
}
}
37 changes: 36 additions & 1 deletion datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,17 @@ impl HashJoinExec {
self.null_equality
}

/// Get the dynamic filter expression for testing purposes.
/// Returns `None` if no dynamic filter has been set.
///
/// This method is intended for testing only and should not be used in production code.
#[doc(hidden)]
pub fn dynamic_filter_for_test(&self) -> Option<Arc<DynamicFilterPhysicalExpr>> {
self.dynamic_filter
.as_ref()
.map(|df| Arc::clone(&df.filter))
}

/// Calculate order preservation flags for this hash join.
fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
vec![
Expand Down Expand Up @@ -921,7 +932,21 @@ impl ExecutionPlan for HashJoinExec {
consider using CoalescePartitionsExec or the EnforceDistribution rule"
);

let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some();
// Only enable dynamic filter pushdown if:
// - The session config enables dynamic filter pushdown
// - A dynamic filter exists
// - At least one consumer is holding a reference to it, this avoids expensive filter
// computation when disabled or when no consumer will use it.
let enable_dynamic_filter_pushdown = context
.session_config()
.options()
.optimizer
.enable_join_dynamic_filter_pushdown
&& self
.dynamic_filter
.as_ref()
.map(|df| df.filter.is_used())
.unwrap_or(false);

let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
let left_fut = match self.mode {
Expand Down Expand Up @@ -4610,6 +4635,11 @@ mod tests {
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let dynamic_filter_clone = Arc::clone(&dynamic_filter);

// Simulate a consumer by creating a transformed copy (what happens during filter pushdown)
let _consumer = Arc::clone(&dynamic_filter)
.with_new_children(vec![])
.unwrap();
Comment on lines +4639 to +4641
Copy link
Contributor Author

@LiaCastaneda LiaCastaneda Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to add a consumer in these tests, otherwise is_used will return false, no filters will be computed and wait_complete will never return. I will add an is_used check inside wait_complete as well, I can't imagine this ever happenning (unless we call wait_complete on a probe node that does not accept dynamic filters which would be wrong usage) but its worth adding just in case.


// Create HashJoinExec with the dynamic filter
let mut join = HashJoinExec::try_new(
left,
Expand Down Expand Up @@ -4658,6 +4688,11 @@ mod tests {
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let dynamic_filter_clone = Arc::clone(&dynamic_filter);

// Simulate a consumer by creating a transformed copy (what happens during filter pushdown)
let _consumer = Arc::clone(&dynamic_filter)
.with_new_children(vec![])
.unwrap();

// Create HashJoinExec with the dynamic filter
let mut join = HashJoinExec::try_new(
left,
Expand Down