Skip to content

Conversation

@gene-bordegaray
Copy link

@gene-bordegaray gene-bordegaray commented Dec 28, 2025

closes #223

successor to #229 , since the merge conflicts were so bad it just made sense to reimplement in a new branch

This is merging into: datafusion-contrib:gabrielmusat/tpcds-benchmarks but we can merge into main once that is merged.

Not much to report on TPCH:
Screenshot 2025-12-27 at 6 07 40 PM

But very excited about these 😄
Screenshot 2025-12-27 at 9 03 34 PM

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made this executable

@gene-bordegaray gene-bordegaray marked this pull request as ready for review December 28, 2025 01:55
@gabotechs gabotechs deleted the branch datafusion-contrib:gabrielmusat/tpcds-benchmarks December 28, 2025 10:53
@gabotechs gabotechs closed this Dec 28, 2025
@gabotechs gabotechs reopened this Dec 28, 2025
@gabotechs gabotechs self-requested a review December 28, 2025 10:56
Copy link
Collaborator

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome piece of work!

Had some suggestions for how to model the broadcasting logic, let me know what you think

Comment on lines +226 to +230
// For Broadcast, distribute_plan handles build/probe sides separately with different task counts.
if nb == &RequiredNetworkBoundary::Broadcast {
return Ok(annotated_plan);
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the Broadcast network boundary is bypassing the scale factor calculation mechanism, but I see no reason why that should happen. What I'd try to do here is:

  1. Make the Broadcast adhere to task count scale factor calculation (just removing these lines should be enough)
  2. Make a new test in this file that shows how the scale factor is propagated through Broadcast boundaries. For example, having a test with a JOIN that has a filter on the probe side should show how the stage containing the join has less tasks than the one on the probe side.

Comment on lines +123 to +131
let (num_remaining, cached_batches) = if is_broadcast {
(
Arc::new(AtomicUsize::new(consumer_task_count.unwrap_or(1) as usize)),
Some(Arc::new(OnceCell::new())),
)
} else {
let total_partitions = plan.properties().partitioning.partition_count();
(Arc::new(AtomicUsize::new(total_partitions)), None)
};
Copy link
Collaborator

@gabotechs gabotechs Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be able to make this work without special-casing the broadcast scenario, and without leaking details about how things should be executed to the do_get endpoint.

What I see in this file is that all the logic for broadcasting is placed here, and as a consequence, a new consumer_task_count also is needed, but this file should be just dumb plumbing for executing arbitrary plans. I think we should be able to ship this PR leaving the do_get.rs file untouched, and I think I have a relatively clear idea on how this should be done:

  • Instead how having the broadcasting logic live here, build a new ExecutionPlan implementation, something like a BroadcastExec, that is in charge of caching the batches and streaming from cache. Note how other systems like Trino or Spark actually do this, but it's just modeled as a new Partitioning::Broadcast mode of their RepartitionExec equivalent.
  • Let the NetworkBroadcastExec place this node right below it, in the same way we ensure that a NetworkShuffleExec scales up its child RepartitionExec accounting for the consumer task count

With this, the information about the consumer_task_count would be baked into the BroadcastExec in the same way NetworkShuffleExec bakes it into its child RepartitionExec, and you don't need to explicitly propagate that with an extra field.

We would also be able to see a BroadcastExec in the plan, collect metrics in it, etc...

Copy link
Collaborator

@gabotechs gabotechs Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One other think IMO we should do, is to not break the assumption that DataFusion partitions are only going to be executed once. Right now, this rule applies for all execution nodes, but not here, and that implies doing some juggling with the num_partitions_remaining by special casing it.

Instead, one idea that comes to mind is:

If originally the build side of the JOIN had 6 partitions, and we are broadcasting to 3 tasks, scale up the BroadcastExec to 18 partitions, and let it execute under the hood executes partitions 0,1,2,3,4,5 normally, but partitions 6,7,8,9,10,11 are a mirror of partitions 0,1,2,3,4,5, and partitions 12,13,14,15,16,17 are a mirror of 0,1,2,3,4,5 as well (NetworkBroadcastExec will know where to go).

I think with this we'll end up with a perfectly normal plan that is not special.

.downcast_ref::<CoalescePartitionsExec>()
.is_none()
{
Arc::new(CoalescePartitionsExec::new(input)) as Arc<dyn ExecutionPlan>
Copy link
Collaborator

@gabotechs gabotechs Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I think you should be able to implement this without coalescing partitions below the network boundary right? and to let it have as many network data streams as original partitions * tasks. Isn't CoalescePartitionsExec just needed above the network boundary between HashJoinExec and NetworkBroadCastExec instead of below it?

Comment on lines +171 to +173
let stream = plan_ref.execute(target_partition, task_ctx)?;
let batches: Vec<RecordBatch> = stream.try_collect().await?;
Ok::<_, DataFusionError>(Arc::new(batches))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than collecting everything before sending it over the wire, I think we should be able to stream it progressively as data comes, we just need it accumulated right before the HashJoinExec, but not before the network hop.

@gabotechs gabotechs deleted the branch datafusion-contrib:gabrielmusat/tpcds-benchmarks December 29, 2025 11:43
@gabotechs gabotechs closed this Dec 29, 2025
@gabotechs
Copy link
Collaborator

gabotechs commented Dec 29, 2025

🤔 I think this was automatically closed because I merged the TPC-DS benchmarks PR into main, not sure why it did not start pointing to main automatically. I do not seem to be able to re-open it though, I imagine because this comes from your fork you'll need to point it to main manually?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants