-
Notifications
You must be signed in to change notification settings - Fork 19
Gene.bordegaray/2025/12/add broadcast exec #272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Gene.bordegaray/2025/12/add broadcast exec #272
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made this executable
gabotechs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome piece of work!
Had some suggestions for how to model the broadcasting logic, let me know what you think
| // For Broadcast, distribute_plan handles build/probe sides separately with different task counts. | ||
| if nb == &RequiredNetworkBoundary::Broadcast { | ||
| return Ok(annotated_plan); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like the Broadcast network boundary is bypassing the scale factor calculation mechanism, but I see no reason why that should happen. What I'd try to do here is:
- Make the Broadcast adhere to task count scale factor calculation (just removing these lines should be enough)
- Make a new test in this file that shows how the scale factor is propagated through Broadcast boundaries. For example, having a test with a JOIN that has a filter on the probe side should show how the stage containing the join has less tasks than the one on the probe side.
| let (num_remaining, cached_batches) = if is_broadcast { | ||
| ( | ||
| Arc::new(AtomicUsize::new(consumer_task_count.unwrap_or(1) as usize)), | ||
| Some(Arc::new(OnceCell::new())), | ||
| ) | ||
| } else { | ||
| let total_partitions = plan.properties().partitioning.partition_count(); | ||
| (Arc::new(AtomicUsize::new(total_partitions)), None) | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should be able to make this work without special-casing the broadcast scenario, and without leaking details about how things should be executed to the do_get endpoint.
What I see in this file is that all the logic for broadcasting is placed here, and as a consequence, a new consumer_task_count also is needed, but this file should be just dumb plumbing for executing arbitrary plans. I think we should be able to ship this PR leaving the do_get.rs file untouched, and I think I have a relatively clear idea on how this should be done:
- Instead how having the broadcasting logic live here, build a new
ExecutionPlanimplementation, something like aBroadcastExec, that is in charge of caching the batches and streaming from cache. Note how other systems like Trino or Spark actually do this, but it's just modeled as a newPartitioning::Broadcastmode of theirRepartitionExecequivalent. - Let the
NetworkBroadcastExecplace this node right below it, in the same way we ensure that a NetworkShuffleExec scales up its child RepartitionExec accounting for the consumer task count
With this, the information about the consumer_task_count would be baked into the BroadcastExec in the same way NetworkShuffleExec bakes it into its child RepartitionExec, and you don't need to explicitly propagate that with an extra field.
We would also be able to see a BroadcastExec in the plan, collect metrics in it, etc...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One other think IMO we should do, is to not break the assumption that DataFusion partitions are only going to be executed once. Right now, this rule applies for all execution nodes, but not here, and that implies doing some juggling with the num_partitions_remaining by special casing it.
Instead, one idea that comes to mind is:
If originally the build side of the JOIN had 6 partitions, and we are broadcasting to 3 tasks, scale up the BroadcastExec to 18 partitions, and let it execute under the hood executes partitions 0,1,2,3,4,5 normally, but partitions 6,7,8,9,10,11 are a mirror of partitions 0,1,2,3,4,5, and partitions 12,13,14,15,16,17 are a mirror of 0,1,2,3,4,5 as well (NetworkBroadcastExec will know where to go).
I think with this we'll end up with a perfectly normal plan that is not special.
| .downcast_ref::<CoalescePartitionsExec>() | ||
| .is_none() | ||
| { | ||
| Arc::new(CoalescePartitionsExec::new(input)) as Arc<dyn ExecutionPlan> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 I think you should be able to implement this without coalescing partitions below the network boundary right? and to let it have as many network data streams as original partitions * tasks. Isn't CoalescePartitionsExec just needed above the network boundary between HashJoinExec and NetworkBroadCastExec instead of below it?
| let stream = plan_ref.execute(target_partition, task_ctx)?; | ||
| let batches: Vec<RecordBatch> = stream.try_collect().await?; | ||
| Ok::<_, DataFusionError>(Arc::new(batches)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than collecting everything before sending it over the wire, I think we should be able to stream it progressively as data comes, we just need it accumulated right before the HashJoinExec, but not before the network hop.
|
🤔 I think this was automatically closed because I merged the TPC-DS benchmarks PR into |
closes #223
successor to #229 , since the merge conflicts were so bad it just made sense to reimplement in a new branch
This is merging into: datafusion-contrib:gabrielmusat/tpcds-benchmarks but we can merge into main once that is merged.
Not much to report on TPCH:

But very excited about these 😄
