From c1ba62ad17e9eb0b2059b720155612dbaf531957 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Sun, 28 Dec 2025 18:15:34 +0800 Subject: [PATCH 1/2] refactor: extract the data generate out of benchmark --- datafusion/core/benches/topk_aggregate.rs | 124 ++++++++++------------ 1 file changed, 54 insertions(+), 70 deletions(-) diff --git a/datafusion/core/benches/topk_aggregate.rs b/datafusion/core/benches/topk_aggregate.rs index a4ae479de4d27..f1b0e24a441bf 100644 --- a/datafusion/core/benches/topk_aggregate.rs +++ b/datafusion/core/benches/topk_aggregate.rs @@ -20,10 +20,9 @@ mod data_utils; use arrow::util::pretty::pretty_format_batches; use criterion::{Criterion, criterion_group, criterion_main}; use data_utils::make_data; -use datafusion::physical_plan::{ExecutionPlan, collect, displayable}; +use datafusion::physical_plan::{collect, displayable}; use datafusion::prelude::SessionContext; use datafusion::{datasource::MemTable, error::Result}; -use datafusion_execution::TaskContext; use datafusion_execution::config::SessionConfig; use std::hint::black_box; use std::sync::Arc; @@ -36,7 +35,7 @@ async fn create_context( asc: bool, use_topk: bool, use_view: bool, -) -> Result<(Arc, Arc)> { +) -> Result { let (schema, parts) = make_data(partition_cnt, sample_cnt, asc, use_view).unwrap(); let mem_table = Arc::new(MemTable::try_new(schema, parts).unwrap()); @@ -57,21 +56,32 @@ async fn create_context( use_topk ); - Ok((physical_plan, ctx.task_ctx())) + Ok(ctx) } #[expect(clippy::needless_pass_by_value)] -fn run(rt: &Runtime, plan: Arc, ctx: Arc, asc: bool) { - black_box(rt.block_on(async { aggregate(plan.clone(), ctx.clone(), asc).await })) - .unwrap(); +fn run(rt: &Runtime, ctx: SessionContext, limit: usize, use_topk: bool, asc: bool) { + black_box(rt.block_on(async { aggregate(ctx, limit, use_topk, asc).await })).unwrap(); } async fn aggregate( - plan: Arc, - ctx: Arc, + ctx: SessionContext, + limit: usize, + use_topk: bool, asc: bool, ) -> Result<()> { - let batches = collect(plan, ctx).await?; + let sql = format!( + "select max(timestamp_ms) from traces group by trace_id order by max(timestamp_ms) desc limit {limit};" + ); + let df = ctx.sql(sql.as_str()).await?; + let plan = df.create_physical_plan().await?; + let actual_phys_plan = displayable(plan.as_ref()).indent(true).to_string(); + assert_eq!( + actual_phys_plan.contains(&format!("lim=[{limit}]")), + use_topk + ); + + let batches = collect(plan, ctx.task_ctx()).await?; assert_eq!(batches.len(), 1); let batch = batches.first().unwrap(); assert_eq!(batch.num_rows(), 10); @@ -107,106 +117,80 @@ fn criterion_benchmark(c: &mut Criterion) { let partitions = 10; let samples = 1_000_000; + let ctx = rt + .block_on(create_context( + limit, partitions, samples, false, false, false, + )) + .unwrap(); c.bench_function( format!("aggregate {} time-series rows", partitions * samples).as_str(), - |b| { - b.iter(|| { - let real = rt.block_on(async { - create_context(limit, partitions, samples, false, false, false) - .await - .unwrap() - }); - run(&rt, real.0.clone(), real.1.clone(), false) - }) - }, + |b| b.iter(|| run(&rt, ctx.clone(), limit, false, false)), ); + let ctx = rt + .block_on(create_context( + limit, partitions, samples, true, false, false, + )) + .unwrap(); c.bench_function( format!("aggregate {} worst-case rows", partitions * samples).as_str(), - |b| { - b.iter(|| { - let asc = rt.block_on(async { - create_context(limit, partitions, samples, true, false, false) - .await - .unwrap() - }); - run(&rt, asc.0.clone(), asc.1.clone(), true) - }) - }, + |b| b.iter(|| run(&rt, ctx.clone(), limit, false, true)), ); + let ctx = rt + .block_on(create_context( + limit, partitions, samples, false, true, false, + )) + .unwrap(); c.bench_function( format!( "top k={limit} aggregate {} time-series rows", partitions * samples ) .as_str(), - |b| { - b.iter(|| { - let topk_real = rt.block_on(async { - create_context(limit, partitions, samples, false, true, false) - .await - .unwrap() - }); - run(&rt, topk_real.0.clone(), topk_real.1.clone(), false) - }) - }, + |b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)), ); + let ctx = rt + .block_on(create_context( + limit, partitions, samples, true, true, false, + )) + .unwrap(); c.bench_function( format!( "top k={limit} aggregate {} worst-case rows", partitions * samples ) .as_str(), - |b| { - b.iter(|| { - let topk_asc = rt.block_on(async { - create_context(limit, partitions, samples, true, true, false) - .await - .unwrap() - }); - run(&rt, topk_asc.0.clone(), topk_asc.1.clone(), true) - }) - }, + |b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)), ); // Utf8View schema,time-series rows + let ctx = rt + .block_on(create_context( + limit, partitions, samples, false, true, true, + )) + .unwrap(); c.bench_function( format!( "top k={limit} aggregate {} time-series rows [Utf8View]", partitions * samples ) .as_str(), - |b| { - b.iter(|| { - let topk_real = rt.block_on(async { - create_context(limit, partitions, samples, false, true, true) - .await - .unwrap() - }); - run(&rt, topk_real.0.clone(), topk_real.1.clone(), false) - }) - }, + |b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)), ); // Utf8View schema,worst-case rows + let ctx = rt + .block_on(create_context(limit, partitions, samples, true, true, true)) + .unwrap(); c.bench_function( format!( "top k={limit} aggregate {} worst-case rows [Utf8View]", partitions * samples ) .as_str(), - |b| { - b.iter(|| { - let topk_asc = rt.block_on(async { - create_context(limit, partitions, samples, true, true, true) - .await - .unwrap() - }); - run(&rt, topk_asc.0.clone(), topk_asc.1.clone(), true) - }) - }, + |b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)), ); } From 2cde46de07076bbbd78f3cdd68a84850dfafeaaa Mon Sep 17 00:00:00 2001 From: Huaijin Date: Sun, 28 Dec 2025 18:49:04 +0800 Subject: [PATCH 2/2] fmt --- datafusion/core/benches/topk_aggregate.rs | 34 ++++------------------- 1 file changed, 6 insertions(+), 28 deletions(-) diff --git a/datafusion/core/benches/topk_aggregate.rs b/datafusion/core/benches/topk_aggregate.rs index f1b0e24a441bf..be193f873713b 100644 --- a/datafusion/core/benches/topk_aggregate.rs +++ b/datafusion/core/benches/topk_aggregate.rs @@ -29,7 +29,6 @@ use std::sync::Arc; use tokio::runtime::Runtime; async fn create_context( - limit: usize, partition_cnt: i32, sample_cnt: i32, asc: bool, @@ -45,21 +44,10 @@ async fn create_context( opts.optimizer.enable_topk_aggregation = use_topk; let ctx = SessionContext::new_with_config(cfg); let _ = ctx.register_table("traces", mem_table)?; - let sql = format!( - "select max(timestamp_ms) from traces group by trace_id order by max(timestamp_ms) desc limit {limit};" - ); - let df = ctx.sql(sql.as_str()).await?; - let physical_plan = df.create_physical_plan().await?; - let actual_phys_plan = displayable(physical_plan.as_ref()).indent(true).to_string(); - assert_eq!( - actual_phys_plan.contains(&format!("lim=[{limit}]")), - use_topk - ); Ok(ctx) } -#[expect(clippy::needless_pass_by_value)] fn run(rt: &Runtime, ctx: SessionContext, limit: usize, use_topk: bool, asc: bool) { black_box(rt.block_on(async { aggregate(ctx, limit, use_topk, asc).await })).unwrap(); } @@ -118,9 +106,7 @@ fn criterion_benchmark(c: &mut Criterion) { let samples = 1_000_000; let ctx = rt - .block_on(create_context( - limit, partitions, samples, false, false, false, - )) + .block_on(create_context(partitions, samples, false, false, false)) .unwrap(); c.bench_function( format!("aggregate {} time-series rows", partitions * samples).as_str(), @@ -128,9 +114,7 @@ fn criterion_benchmark(c: &mut Criterion) { ); let ctx = rt - .block_on(create_context( - limit, partitions, samples, true, false, false, - )) + .block_on(create_context(partitions, samples, true, false, false)) .unwrap(); c.bench_function( format!("aggregate {} worst-case rows", partitions * samples).as_str(), @@ -138,9 +122,7 @@ fn criterion_benchmark(c: &mut Criterion) { ); let ctx = rt - .block_on(create_context( - limit, partitions, samples, false, true, false, - )) + .block_on(create_context(partitions, samples, false, true, false)) .unwrap(); c.bench_function( format!( @@ -152,9 +134,7 @@ fn criterion_benchmark(c: &mut Criterion) { ); let ctx = rt - .block_on(create_context( - limit, partitions, samples, true, true, false, - )) + .block_on(create_context(partitions, samples, true, true, false)) .unwrap(); c.bench_function( format!( @@ -167,9 +147,7 @@ fn criterion_benchmark(c: &mut Criterion) { // Utf8View schema,time-series rows let ctx = rt - .block_on(create_context( - limit, partitions, samples, false, true, true, - )) + .block_on(create_context(partitions, samples, false, true, true)) .unwrap(); c.bench_function( format!( @@ -182,7 +160,7 @@ fn criterion_benchmark(c: &mut Criterion) { // Utf8View schema,worst-case rows let ctx = rt - .block_on(create_context(limit, partitions, samples, true, true, true)) + .block_on(create_context(partitions, samples, true, true, true)) .unwrap(); c.bench_function( format!(