From 5516fb16c6a081a2c6a063f49301f24b22167940 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 25 Dec 2025 21:58:54 +0200 Subject: [PATCH 1/4] add benchmarks for large schema --- .../core/benches/aggregate_query_sql.rs | 64 ++++++++ datafusion/core/benches/data_utils/mod.rs | 145 +++++++++++++++++- 2 files changed, 203 insertions(+), 6 deletions(-) diff --git a/datafusion/core/benches/aggregate_query_sql.rs b/datafusion/core/benches/aggregate_query_sql.rs index 4aa667504e459..251fcb81bf4e7 100644 --- a/datafusion/core/benches/aggregate_query_sql.rs +++ b/datafusion/core/benches/aggregate_query_sql.rs @@ -29,7 +29,10 @@ use datafusion::execution::context::SessionContext; use parking_lot::Mutex; use std::hint::black_box; use std::sync::Arc; +use arrow_schema::{Schema, SchemaRef}; +use itertools::Itertools; use tokio::runtime::Runtime; +use crate::data_utils::{create_large_non_nested_schema, create_nested_schema, create_table_provider_from_schema}; #[expect(clippy::needless_pass_by_value)] fn query(ctx: Arc>, rt: &Runtime, sql: &str) { @@ -48,6 +51,19 @@ fn create_context( Ok(Arc::new(Mutex::new(ctx))) } +fn create_context_with_random_data_for_schema( + partitions_len: usize, + array_len: usize, + batch_size: usize, + schema: SchemaRef, +) -> Result>> { + let ctx = SessionContext::new(); + let provider = create_table_provider_from_schema(partitions_len, array_len, batch_size, schema)?; + ctx.register_table("t", provider)?; + Ok(Arc::new(Mutex::new(ctx))) +} + + fn criterion_benchmark(c: &mut Criterion) { let partitions_len = 8; let array_len = 32768 * 2; // 2^16 @@ -55,6 +71,12 @@ fn criterion_benchmark(c: &mut Criterion) { let ctx = create_context(partitions_len, array_len, batch_size).unwrap(); let rt = Runtime::new().unwrap(); + let nested_ctx = create_context_with_random_data_for_schema(1, array_len, batch_size, create_nested_schema()).unwrap(); + let nested_rt = Runtime::new().unwrap(); + + let large_non_nested_schema_ctx = create_context_with_random_data_for_schema(1, array_len, batch_size, create_large_non_nested_schema()).unwrap(); + let large_non_nested_schema_rt = Runtime::new().unwrap(); + c.bench_function("aggregate_query_no_group_by 15 12", |b| { b.iter(|| { query( @@ -188,6 +210,40 @@ fn criterion_benchmark(c: &mut Criterion) { }, ); + { + let column_names = get_all_columns_in_schema(&create_large_non_nested_schema()); + let sql = format!("SELECT {column_names} FROM t GROUP BY {column_names}"); + c.bench_function( + "aggregate_query_on_large_non_nested_schema", + |b| { + b.iter(|| { + query( + large_non_nested_schema_ctx.clone(), + &large_non_nested_schema_rt, + sql.as_str() + ) + }) + }, + ); + } + + { + let column_names = get_all_columns_in_schema(&create_nested_schema()); + let sql = format!("SELECT {column_names} FROM t GROUP BY {column_names}"); + c.bench_function( + "aggregate_query_on_complex_schema", + |b| { + b.iter(|| { + query( + nested_ctx.clone(), + &nested_rt, + sql.as_str() + ) + }) + }, + ); + } + c.bench_function("aggregate_query_approx_percentile_cont_on_u64", |b| { b.iter(|| { query( @@ -258,5 +314,13 @@ fn criterion_benchmark(c: &mut Criterion) { }); } +fn get_all_columns_in_schema(schema: &Schema) -> String { + schema + .fields() + .iter() + .map(|f| f.name()) + .join(", ") +} + criterion_group!(benches, criterion_benchmark); criterion_main!(benches); diff --git a/datafusion/core/benches/data_utils/mod.rs b/datafusion/core/benches/data_utils/mod.rs index 630bc056600b4..d9ef2e3c6c5ce 100644 --- a/datafusion/core/benches/data_utils/mod.rs +++ b/datafusion/core/benches/data_utils/mod.rs @@ -33,6 +33,8 @@ use rand_distr::Distribution; use rand_distr::{Normal, Pareto}; use std::fmt::Write; use std::sync::Arc; +use arrow::util::data_gen::create_random_batch; +use arrow_schema::Fields; /// create an in-memory table given the partition len, array len, and batch size, /// and the result table will be of array_len in total, and then partitioned, and batched. @@ -49,6 +51,116 @@ pub fn create_table_provider( MemTable::try_new(schema, partitions).map(Arc::new) } +pub fn create_table_provider_from_schema( + partitions_len: usize, + array_len: usize, + batch_size: usize, + schema: SchemaRef, +) -> Result> { + let partitions = + create_record_batches_from( + schema.clone(), + array_len, + partitions_len, + batch_size, + |schema, rng, batch_size, index| create_random_batch( + schema.clone(), + batch_size, + 0.1, + 0.5 + ).unwrap() + ); + // declare a table in memory. In spark API, this corresponds to createDataFrame(...). + MemTable::try_new(schema, partitions).map(Arc::new) +} + +pub fn create_nested_schema() -> SchemaRef { + let fields = vec![ + Field::new( + "_1", + DataType::Struct(Fields::from(vec![ + Field::new("_1", DataType::Int8, true), + Field::new( + "_2", + DataType::Struct(Fields::from(vec![ + Field::new("_1", DataType::Int8, true), + Field::new( + "_1", + DataType::Struct(Fields::from(vec![ + Field::new("_1", DataType::Int8, true), + Field::new("_2", DataType::Utf8, true), + ])), + true, + ), + Field::new("_2", DataType::UInt8, true), + ])), + true, + ), + ])), + true, + ), + Field::new( + "_2", + DataType::LargeList(Arc::new(Field::new_list_field( + DataType::List(Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(vec![ + Field::new( + "_1", + DataType::Struct(Fields::from(vec![ + Field::new("_1", DataType::Int8, true), + Field::new("_2", DataType::Int16, true), + Field::new("_3", DataType::Int32, true), + ])), + true, + ), + Field::new( + "_2", + DataType::List(Arc::new(Field::new( + "", + DataType::FixedSizeBinary(2), + true, + ))), + true, + ), + ])), + true, + ))), + true, + ))), + true, + ), + ]; + Arc::new(Schema::new(fields)) + +} + + +/// Creating a schema that is supported by Multi group by implementation in aggregation +pub fn create_large_non_nested_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("1", DataType::Utf8, false), + Field::new("2", DataType::LargeUtf8, false), + Field::new("3", DataType::Utf8View, true), + Field::new("4", DataType::Binary, true), + Field::new("5", DataType::LargeBinary, true), + Field::new("6", DataType::BinaryView, true), + Field::new("7", DataType::Int8, false), + Field::new("8", DataType::UInt8, false), + Field::new("9", DataType::Int16, false), + Field::new("10", DataType::UInt16, false), + Field::new("11", DataType::Int32, false), + Field::new("12", DataType::UInt32, false), + Field::new("13", DataType::Int64, false), + Field::new("14", DataType::UInt64, false), + Field::new("15", DataType::Decimal128(5, 2), false), + Field::new("16", DataType::Decimal256(5, 2), false), + Field::new("17", DataType::Float32, false), + Field::new("18", DataType::Float64, true), + Field::new("19", DataType::Boolean, true), + ])) +} + + /// Create test data schema pub fn create_schema() -> Schema { Schema::new(vec![ @@ -145,15 +257,36 @@ pub fn create_record_batches( array_len: usize, partitions_len: usize, batch_size: usize, +) -> Vec> { + create_record_batches_from( + schema, + array_len, + partitions_len, + batch_size, + |schema, rng, batch_size, index| create_record_batch(schema.clone(), rng, batch_size, index) + ) +} + + + +/// Create record batches of `partitions_len` partitions and `batch_size` for each batch, +/// with a total number of `array_len` records +#[expect(clippy::needless_pass_by_value)] +fn create_record_batches_from( + schema: SchemaRef, + array_len: usize, + partitions_len: usize, + batch_size: usize, + record_batch_creator: fn(&SchemaRef, &mut StdRng, usize, usize) -> RecordBatch, ) -> Vec> { let mut rng = StdRng::seed_from_u64(42); (0..partitions_len) - .map(|_| { - (0..array_len / batch_size / partitions_len) - .map(|i| create_record_batch(schema.clone(), &mut rng, batch_size, i)) - .collect::>() - }) - .collect::>() + .map(|_| { + (0..array_len / batch_size / partitions_len) + .map(|i| record_batch_creator(&schema, &mut rng, batch_size, i)) + .collect::>() + }) + .collect::>() } /// An enum that wraps either a regular StringBuilder or a GenericByteViewBuilder From e02d5f6caece43d63faae3efcc1891b8e86efcd3 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 28 Dec 2025 12:59:12 +0200 Subject: [PATCH 2/4] add prefetching for hash aggregate for large maps --- Cargo.lock | 44 +- Cargo.toml | 2 +- datafusion/common/src/config.rs | 4 + datafusion/common/src/hash_utils.rs | 47 +- .../core/benches/aggregate_query_sql.rs | 424 +++++++++--------- datafusion/core/benches/data_utils/mod.rs | 65 ++- datafusion/physical-plan/Cargo.toml | 1 + .../physical-plan/benches/repartition.rs | 329 ++++++++++++++ .../src/aggregates/group_values/mod.rs | 9 +- .../group_values/multi_group_by/mod.rs | 253 ++++++++--- .../src/aggregates/group_values/row.rs | 4 +- .../physical-plan/src/aggregates/row_hash.rs | 3 +- .../physical-plan/src/recursive_query.rs | 2 +- 13 files changed, 889 insertions(+), 298 deletions(-) create mode 100644 datafusion/physical-plan/benches/repartition.rs diff --git a/Cargo.lock b/Cargo.lock index 22ec582536069..b9b4f0c3cb3cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1232,6 +1232,15 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "branches" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21ab2097ac67710acd7d6034dd11b719c086a440ab65b92aa625be166f5995e9" +dependencies = [ + "rustc_version", +] + [[package]] name = "brotli" version = "8.0.2" @@ -1322,6 +1331,15 @@ dependencies = [ "libbz2-rs-sys", ] +[[package]] +name = "cache-size" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df5c3aca3f278263eecfa3e6d2cd1fdd76029f68a0c9e9ed5ab55f0508e70723" +dependencies = [ + "raw-cpuid", +] + [[package]] name = "cast" version = "0.3.0" @@ -1822,7 +1840,7 @@ checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" dependencies = [ "cfg-if", "crossbeam-utils", - "hashbrown 0.14.5", + "hashbrown 0.14.5 (registry+https://github.com/rust-lang/crates.io-index)", "lock_api", "once_cell", "parking_lot_core", @@ -2013,7 +2031,7 @@ dependencies = [ "chrono", "criterion", "half", - "hashbrown 0.14.5", + "hashbrown 0.14.5 (git+https://github.com/rluvaton/hashbrown?rev=dbdac43f537223a469746f518929b542f3b8933d)", "hex", "indexmap 2.12.1", "insta", @@ -2495,7 +2513,7 @@ dependencies = [ "datafusion-functions-aggregate-common", "datafusion-physical-expr-common", "half", - "hashbrown 0.14.5", + "hashbrown 0.14.5 (git+https://github.com/rluvaton/hashbrown?rev=dbdac43f537223a469746f518929b542f3b8933d)", "indexmap 2.12.1", "insta", "itertools 0.14.0", @@ -2529,7 +2547,7 @@ dependencies = [ "arrow", "datafusion-common", "datafusion-expr-common", - "hashbrown 0.14.5", + "hashbrown 0.14.5 (git+https://github.com/rluvaton/hashbrown?rev=dbdac43f537223a469746f518929b542f3b8933d)", "itertools 0.14.0", ] @@ -2562,6 +2580,7 @@ dependencies = [ "arrow-ord", "arrow-schema", "async-trait", + "cache-size", "criterion", "datafusion-common", "datafusion-common-runtime", @@ -2576,7 +2595,7 @@ dependencies = [ "datafusion-physical-expr-common", "futures", "half", - "hashbrown 0.14.5", + "hashbrown 0.14.5 (git+https://github.com/rluvaton/hashbrown?rev=dbdac43f537223a469746f518929b542f3b8933d)", "indexmap 2.12.1", "insta", "itertools 0.14.0", @@ -3369,9 +3388,15 @@ name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "git+https://github.com/rluvaton/hashbrown?rev=dbdac43f537223a469746f518929b542f3b8933d#dbdac43f537223a469746f518929b542f3b8933d" dependencies = [ "ahash 0.8.12", "allocator-api2", + "branches", ] [[package]] @@ -5106,6 +5131,15 @@ dependencies = [ "rand 0.9.2", ] +[[package]] +name = "raw-cpuid" +version = "11.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" +dependencies = [ + "bitflags 2.9.4", +] + [[package]] name = "rayon" version = "1.11.0" diff --git a/Cargo.toml b/Cargo.toml index 10fc88b7057c8..65ebfed76ad7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -156,7 +156,7 @@ flate2 = "1.1.5" futures = "0.3" glob = "0.3.0" half = { version = "2.7.0", default-features = false } -hashbrown = { version = "0.14.5", features = ["raw"] } +hashbrown = { features = ["raw"], git = "https://github.com/rluvaton/hashbrown", rev = "dbdac43f537223a469746f518929b542f3b8933d" } hex = { version = "0.4.3" } indexmap = "2.12.1" insta = { version = "1.45.0", features = ["glob", "filters"] } diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 2bea2ec5a4526..24dff1aedd28e 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -649,6 +649,10 @@ config_namespace! { /// # Default /// `false` — ANSI SQL mode is disabled by default. pub enable_ansi_mode: bool, default = false + + pub agg_prefetch_elements: usize, default = 0 + pub agg_prefetch_locality: usize, default = 3 + pub agg_prefetch_read: bool, default = true } } diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 98dd1f235aee7..39ca73a71ecde 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -33,6 +33,7 @@ use crate::cast::{ use crate::error::Result; use crate::error::{_internal_datafusion_err, _internal_err}; use std::cell::RefCell; +use arrow::row::Rows; // Combines two hashes into one hash #[inline] @@ -802,8 +803,8 @@ pub fn create_hashes<'a, I, T>( hashes_buffer: &'a mut [u64], ) -> Result<&'a mut [u64]> where - I: IntoIterator, - T: AsDynArray, + I: IntoIterator, + T: AsDynArray, { for (i, array) in arrays.into_iter().enumerate() { // combine hashes with `combine_hashes` for all columns besides the first @@ -813,6 +814,48 @@ where Ok(hashes_buffer) } +/// Creates hash values for every row, based on the values in the columns. +/// +/// The number of rows to hash is determined by `hashes_buffer.len()`. +/// `hashes_buffer` should be pre-sized appropriately. +pub fn create_hashes_rows<'a>( + rows: arrow::row::RowsIter<'_>, + random_state: &RandomState, + hashes_buffer: &'a mut [u64], +) -> Result<&'a mut [u64]> { + hash_rows(rows, random_state, hashes_buffer); + Ok(hashes_buffer) +} + +#[cfg(not(feature = "force_hash_collisions"))] +fn hash_rows<'a>( + rows: arrow::row::RowsIter<'_>, + random_state: &RandomState, + hashes_buffer: &'a mut [u64] +) { + assert_eq!( + hashes_buffer.len(), + rows.len(), + "hashes_buffer and array should be of equal length" + ); + for (hash, row) in hashes_buffer.iter_mut().zip(rows) { + *hash = row.data().hash_one(random_state); + } +} + +/// Test version of `hash_rows` that forces all hashes to collide to zero. +#[cfg(feature = "force_hash_collisions")] +fn hash_rows( + _rows: arrow::row::RowsIter<'_>, + _random_state: &RandomState, + hashes_buffer: &'a mut [u64] +) -> Result<()> { + for hash in hashes_buffer.iter_mut() { + *hash = 0 + } + Ok(()) +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/datafusion/core/benches/aggregate_query_sql.rs b/datafusion/core/benches/aggregate_query_sql.rs index 251fcb81bf4e7..20cd47266e4e5 100644 --- a/datafusion/core/benches/aggregate_query_sql.rs +++ b/datafusion/core/benches/aggregate_query_sql.rs @@ -32,7 +32,7 @@ use std::sync::Arc; use arrow_schema::{Schema, SchemaRef}; use itertools::Itertools; use tokio::runtime::Runtime; -use crate::data_utils::{create_large_non_nested_schema, create_nested_schema, create_table_provider_from_schema}; +use crate::data_utils::{create_large_non_nested_schema, create_large_with_nested_schema, create_nested_schema, create_table_provider_from_schema}; #[expect(clippy::needless_pass_by_value)] fn query(ctx: Arc>, rt: &Runtime, sql: &str) { @@ -77,138 +77,141 @@ fn criterion_benchmark(c: &mut Criterion) { let large_non_nested_schema_ctx = create_context_with_random_data_for_schema(1, array_len, batch_size, create_large_non_nested_schema()).unwrap(); let large_non_nested_schema_rt = Runtime::new().unwrap(); - c.bench_function("aggregate_query_no_group_by 15 12", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT MIN(f64), AVG(f64), COUNT(f64) \ - FROM t", - ) - }) - }); - - c.bench_function("aggregate_query_no_group_by_min_max_f64", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT MIN(f64), MAX(f64) \ - FROM t", - ) - }) - }); - - c.bench_function("aggregate_query_no_group_by_count_distinct_wide", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT COUNT(DISTINCT u64_wide) \ - FROM t", - ) - }) - }); - - c.bench_function("aggregate_query_no_group_by_count_distinct_narrow", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT COUNT(DISTINCT u64_narrow) \ - FROM t", - ) - }) - }); - - c.bench_function("aggregate_query_group_by", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) \ - FROM t GROUP BY utf8", - ) - }) - }); - - c.bench_function("aggregate_query_group_by_with_filter", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) \ - FROM t \ - WHERE f32 > 10 AND f32 < 20 GROUP BY utf8", - ) - }) - }); - - c.bench_function("aggregate_query_group_by_u64 15 12", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT u64_narrow, MIN(f64), AVG(f64), COUNT(f64) \ - FROM t GROUP BY u64_narrow", - ) - }) - }); - - c.bench_function("aggregate_query_group_by_with_filter_u64 15 12", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT u64_narrow, MIN(f64), AVG(f64), COUNT(f64) \ - FROM t \ - WHERE f32 > 10 AND f32 < 20 GROUP BY u64_narrow", - ) - }) - }); - - c.bench_function("aggregate_query_group_by_u64_multiple_keys", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT u64_wide, utf8, MIN(f64), AVG(f64), COUNT(f64) \ - FROM t GROUP BY u64_wide, utf8", - ) - }) - }); - - c.bench_function( - "aggregate_query_group_by_wide_u64_and_string_without_aggregate_expressions", - |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - // Due to the large number of distinct values in u64_wide, - // this query test the actual grouping performance for more than 1 column - "SELECT u64_wide, utf8 \ - FROM t GROUP BY u64_wide, utf8", - ) - }) - }, - ); - - c.bench_function( - "aggregate_query_group_by_wide_u64_and_f32_without_aggregate_expressions", - |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - // Due to the large number of distinct values in u64_wide, - // this query test the actual grouping performance for more than 1 column - "SELECT u64_wide, f32 \ - FROM t GROUP BY u64_wide, f32", - ) - }) - }, - ); + let large_with_nested_schema_ctx = create_context_with_random_data_for_schema(1, array_len, batch_size, create_large_with_nested_schema()).unwrap(); + let large_with_nested_schema_rt = Runtime::new().unwrap(); + // + // c.bench_function("aggregate_query_no_group_by 15 12", |b| { + // b.iter(|| { + // query( + // ctx.clone(), + // &rt, + // "SELECT MIN(f64), AVG(f64), COUNT(f64) \ + // FROM t", + // ) + // }) + // }); + // + // c.bench_function("aggregate_query_no_group_by_min_max_f64", |b| { + // b.iter(|| { + // query( + // ctx.clone(), + // &rt, + // "SELECT MIN(f64), MAX(f64) \ + // FROM t", + // ) + // }) + // }); + // + // c.bench_function("aggregate_query_no_group_by_count_distinct_wide", |b| { + // b.iter(|| { + // query( + // ctx.clone(), + // &rt, + // "SELECT COUNT(DISTINCT u64_wide) \ + // FROM t", + // ) + // }) + // }); + // + // c.bench_function("aggregate_query_no_group_by_count_distinct_narrow", |b| { + // b.iter(|| { + // query( + // ctx.clone(), + // &rt, + // "SELECT COUNT(DISTINCT u64_narrow) \ + // FROM t", + // ) + // }) + // }); + // + // c.bench_function("aggregate_query_group_by", |b| { + // b.iter(|| { + // query( + // ctx.clone(), + // &rt, + // "SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) \ + // FROM t GROUP BY utf8", + // ) + // }) + // }); + // + // c.bench_function("aggregate_query_group_by_with_filter", |b| { + // b.iter(|| { + // query( + // ctx.clone(), + // &rt, + // "SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) \ + // FROM t \ + // WHERE f32 > 10 AND f32 < 20 GROUP BY utf8", + // ) + // }) + // }); + // + // c.bench_function("aggregate_query_group_by_u64 15 12", |b| { + // b.iter(|| { + // query( + // ctx.clone(), + // &rt, + // "SELECT u64_narrow, MIN(f64), AVG(f64), COUNT(f64) \ + // FROM t GROUP BY u64_narrow", + // ) + // }) + // }); + // + // c.bench_function("aggregate_query_group_by_with_filter_u64 15 12", |b| { + // b.iter(|| { + // query( + // ctx.clone(), + // &rt, + // "SELECT u64_narrow, MIN(f64), AVG(f64), COUNT(f64) \ + // FROM t \ + // WHERE f32 > 10 AND f32 < 20 GROUP BY u64_narrow", + // ) + // }) + // }); + // + // c.bench_function("aggregate_query_group_by_u64_multiple_keys", |b| { + // b.iter(|| { + // query( + // ctx.clone(), + // &rt, + // "SELECT u64_wide, utf8, MIN(f64), AVG(f64), COUNT(f64) \ + // FROM t GROUP BY u64_wide, utf8", + // ) + // }) + // }); + // + // c.bench_function( + // "aggregate_query_group_by_wide_u64_and_string_without_aggregate_expressions", + // |b| { + // b.iter(|| { + // query( + // ctx.clone(), + // &rt, + // // Due to the large number of distinct values in u64_wide, + // // this query test the actual grouping performance for more than 1 column + // "SELECT u64_wide, utf8 \ + // FROM t GROUP BY u64_wide, utf8", + // ) + // }) + // }, + // ); + // + // c.bench_function( + // "aggregate_query_group_by_wide_u64_and_f32_without_aggregate_expressions", + // |b| { + // b.iter(|| { + // query( + // ctx.clone(), + // &rt, + // // Due to the large number of distinct values in u64_wide, + // // this query test the actual grouping performance for more than 1 column + // "SELECT u64_wide, f32 \ + // FROM t GROUP BY u64_wide, f32", + // ) + // }) + // }, + // ); { let column_names = get_all_columns_in_schema(&create_large_non_nested_schema()); @@ -227,6 +230,23 @@ fn criterion_benchmark(c: &mut Criterion) { ); } + { + let column_names = get_all_columns_in_schema(&create_large_with_nested_schema()); + let sql = format!("SELECT {column_names} FROM t GROUP BY {column_names}"); + c.bench_function( + "aggregate_query_on_large_with_nested_schema", + |b| { + b.iter(|| { + query( + large_with_nested_schema_ctx.clone(), + &large_with_nested_schema_rt, + sql.as_str() + ) + }) + }, + ); + } + { let column_names = get_all_columns_in_schema(&create_nested_schema()); let sql = format!("SELECT {column_names} FROM t GROUP BY {column_names}"); @@ -243,75 +263,75 @@ fn criterion_benchmark(c: &mut Criterion) { }, ); } - - c.bench_function("aggregate_query_approx_percentile_cont_on_u64", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT utf8, approx_percentile_cont(0.5, 2500) WITHIN GROUP (ORDER BY u64_wide) \ - FROM t GROUP BY utf8", - ) - }) - }); - - c.bench_function("aggregate_query_approx_percentile_cont_on_f32", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT utf8, approx_percentile_cont(0.5, 2500) WITHIN GROUP (ORDER BY f32) \ - FROM t GROUP BY utf8", - ) - }) - }); - - c.bench_function("aggregate_query_distinct_median", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT MEDIAN(DISTINCT u64_wide), MEDIAN(DISTINCT u64_narrow) \ - FROM t", - ) - }) - }); - - c.bench_function("first_last_many_columns", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT first_value(u64_wide order by f64, u64_narrow, utf8),\ - last_value(u64_wide order by f64, u64_narrow, utf8) \ - FROM t GROUP BY u64_narrow", - ) - }) - }); - - c.bench_function("first_last_ignore_nulls", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT first_value(u64_wide ignore nulls order by f64, u64_narrow, utf8), \ - last_value(u64_wide ignore nulls order by f64, u64_narrow, utf8) \ - FROM t GROUP BY u64_narrow", - ) - }) - }); - - c.bench_function("first_last_one_column", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT first_value(u64_wide order by f64), \ - last_value(u64_wide order by f64) \ - FROM t GROUP BY u64_narrow", - ) - }) - }); + // + // c.bench_function("aggregate_query_approx_percentile_cont_on_u64", |b| { + // b.iter(|| { + // query( + // ctx.clone(), + // &rt, + // "SELECT utf8, approx_percentile_cont(0.5, 2500) WITHIN GROUP (ORDER BY u64_wide) \ + // FROM t GROUP BY utf8", + // ) + // }) + // }); + // + // c.bench_function("aggregate_query_approx_percentile_cont_on_f32", |b| { + // b.iter(|| { + // query( + // ctx.clone(), + // &rt, + // "SELECT utf8, approx_percentile_cont(0.5, 2500) WITHIN GROUP (ORDER BY f32) \ + // FROM t GROUP BY utf8", + // ) + // }) + // }); + // + // c.bench_function("aggregate_query_distinct_median", |b| { + // b.iter(|| { + // query( + // ctx.clone(), + // &rt, + // "SELECT MEDIAN(DISTINCT u64_wide), MEDIAN(DISTINCT u64_narrow) \ + // FROM t", + // ) + // }) + // }); + // + // c.bench_function("first_last_many_columns", |b| { + // b.iter(|| { + // query( + // ctx.clone(), + // &rt, + // "SELECT first_value(u64_wide order by f64, u64_narrow, utf8),\ + // last_value(u64_wide order by f64, u64_narrow, utf8) \ + // FROM t GROUP BY u64_narrow", + // ) + // }) + // }); + // + // c.bench_function("first_last_ignore_nulls", |b| { + // b.iter(|| { + // query( + // ctx.clone(), + // &rt, + // "SELECT first_value(u64_wide ignore nulls order by f64, u64_narrow, utf8), \ + // last_value(u64_wide ignore nulls order by f64, u64_narrow, utf8) \ + // FROM t GROUP BY u64_narrow", + // ) + // }) + // }); + // + // c.bench_function("first_last_one_column", |b| { + // b.iter(|| { + // query( + // ctx.clone(), + // &rt, + // "SELECT first_value(u64_wide order by f64), \ + // last_value(u64_wide order by f64) \ + // FROM t GROUP BY u64_narrow", + // ) + // }) + // }); } fn get_all_columns_in_schema(schema: &Schema) -> String { diff --git a/datafusion/core/benches/data_utils/mod.rs b/datafusion/core/benches/data_utils/mod.rs index d9ef2e3c6c5ce..b1826411413b6 100644 --- a/datafusion/core/benches/data_utils/mod.rs +++ b/datafusion/core/benches/data_utils/mod.rs @@ -138,25 +138,52 @@ pub fn create_nested_schema() -> SchemaRef { /// Creating a schema that is supported by Multi group by implementation in aggregation pub fn create_large_non_nested_schema() -> SchemaRef { Arc::new(Schema::new(vec![ - Field::new("1", DataType::Utf8, false), - Field::new("2", DataType::LargeUtf8, false), - Field::new("3", DataType::Utf8View, true), - Field::new("4", DataType::Binary, true), - Field::new("5", DataType::LargeBinary, true), - Field::new("6", DataType::BinaryView, true), - Field::new("7", DataType::Int8, false), - Field::new("8", DataType::UInt8, false), - Field::new("9", DataType::Int16, false), - Field::new("10", DataType::UInt16, false), - Field::new("11", DataType::Int32, false), - Field::new("12", DataType::UInt32, false), - Field::new("13", DataType::Int64, false), - Field::new("14", DataType::UInt64, false), - Field::new("15", DataType::Decimal128(5, 2), false), - Field::new("16", DataType::Decimal256(5, 2), false), - Field::new("17", DataType::Float32, false), - Field::new("18", DataType::Float64, true), - Field::new("19", DataType::Boolean, true), + Field::new("a1", DataType::Utf8, false), + Field::new("a2", DataType::LargeUtf8, false), + Field::new("a3", DataType::Utf8View, true), + Field::new("a4", DataType::Binary, true), + Field::new("a5", DataType::LargeBinary, true), + Field::new("a6", DataType::BinaryView, true), + Field::new("a7", DataType::Int8, false), + Field::new("a8", DataType::UInt8, false), + Field::new("a9", DataType::Int16, false), + Field::new("a10", DataType::UInt16, false), + Field::new("a11", DataType::Int32, false), + Field::new("a12", DataType::UInt32, false), + Field::new("a13", DataType::Int64, false), + Field::new("a14", DataType::UInt64, false), + // Field::new("a15", DataType::Decimal128(5, 2), false), + // Field::new("a16", DataType::Decimal256(5, 2), false), + Field::new("a17", DataType::Float32, false), + Field::new("a18", DataType::Float64, true), + Field::new("a19", DataType::Boolean, true), + ])) +} + +/// Creating a schema that is supported by Multi group by implementation in aggregation +pub fn create_large_with_nested_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("a1", DataType::Utf8, false), + Field::new("a2", DataType::LargeUtf8, false), + Field::new("a3", DataType::Utf8View, true), + Field::new("a4", DataType::Binary, true), + Field::new("a5", DataType::LargeBinary, true), + Field::new("a6", DataType::BinaryView, true), + Field::new("a7", DataType::Int8, false), + Field::new("a8", DataType::UInt8, false), + Field::new("a9", DataType::Int16, false), + Field::new("a10", DataType::UInt16, false), + Field::new("a11", DataType::Int32, false), + Field::new("a12", DataType::UInt32, false), + Field::new("a13", DataType::Int64, false), + Field::new("a14", DataType::UInt64, false), + // Field::new("a15", DataType::Decimal128(5, 2), false), + // Field::new("a16", DataType::Decimal256(5, 2), false), + Field::new("a17", DataType::Float32, false), + Field::new("a18", DataType::Float64, true), + Field::new("a19", DataType::Boolean, true), + Field::new("a20", DataType::Struct(create_large_non_nested_schema().fields.clone()), true), + Field::new_list("a21", Field::new_list_field(DataType::Utf8, true), true), ])) } diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 68e67fa018f08..da5c45dcb2de0 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -70,6 +70,7 @@ log = { workspace = true } parking_lot = { workspace = true } pin-project-lite = "^0.2.7" tokio = { workspace = true } +cache-size = "0.7.0" [dev-dependencies] criterion = { workspace = true, features = ["async_futures"] } diff --git a/datafusion/physical-plan/benches/repartition.rs b/datafusion/physical-plan/benches/repartition.rs new file mode 100644 index 0000000000000..fca8e23e6133a --- /dev/null +++ b/datafusion/physical-plan/benches/repartition.rs @@ -0,0 +1,329 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::ArrayRef; +use arrow::datatypes::{Int32Type, StringViewType}; +use arrow::util::bench_util::{ + create_primitive_array, create_string_view_array_with_len, + create_string_view_array_with_max_len, +}; +use arrow::util::test_util::seedable_rng; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use criterion::measurement::WallTime; +use criterion::{ + BenchmarkGroup, BenchmarkId, Criterion, criterion_group, criterion_main, +}; +use datafusion_physical_plan::aggregates::group_values::multi_group_by::GroupColumn; +use datafusion_physical_plan::aggregates::group_values::multi_group_by::bytes_view::ByteViewGroupValueBuilder; +use datafusion_physical_plan::aggregates::group_values::multi_group_by::primitive::PrimitiveGroupValueBuilder; +use rand::distr::{Bernoulli, Distribution}; +use std::hint::black_box; +use std::sync::Arc; +use datafusion_physical_plan::memory::LazyMemoryExec; +use datafusion_physical_plan::repartition::RepartitionExec; + +const SIZES: [usize; 3] = [1_000, 10_000, 100_000]; +const NULL_DENSITIES: [f32; 3] = [0.0, 0.1, 0.5]; + +fn bench_repartition(c: &mut Criterion) { + byte_view_vectorized_append(c); + primitive_vectorized_append(c); +} + +fn get_schema() -> SchemaRef { + // Large schema with many columns + + Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Float32, false), + Field::new("d", DataType::Decimal128(5, 2), true), + Field::new("e", DataType::Binary, true), + Field::new("f", DataType::Utf8, true), + // Field::new_list("g", DataType::Utf8, true), + ])) +} + +fn get_infinite_stream() { + let a = LazyMemoryExec::try_new() +} + +fn bench_repartition_with(c: &mut Criterion) { + let source = LazyMemoryExec::try_new() + RepartitionExec::try_new() +} + +fn byte_view_vectorized_append(c: &mut Criterion) { + let mut group = c.benchmark_group("ByteViewGroupValueBuilder_vectorized_append"); + + for &size in &SIZES { + let rows: Vec = (0..size).collect(); + + for &null_density in &NULL_DENSITIES { + let input = create_string_view_array_with_len(size, null_density, 8, false); + let input: ArrayRef = Arc::new(input); + + bytes_bench(&mut group, "inline", size, &rows, null_density, &input); + } + } + + for &size in &SIZES { + let rows: Vec = (0..size).collect(); + + for &null_density in &NULL_DENSITIES { + let input = create_string_view_array_with_len(size, null_density, 64, true); + let input: ArrayRef = Arc::new(input); + + bytes_bench(&mut group, "scenario", size, &rows, null_density, &input); + } + } + + for &size in &SIZES { + let rows: Vec = (0..size).collect(); + + for &null_density in &NULL_DENSITIES { + let input = create_string_view_array_with_max_len(size, null_density, 400); + let input: ArrayRef = Arc::new(input); + + bytes_bench(&mut group, "random", size, &rows, null_density, &input); + } + } + + group.finish(); +} + +fn bytes_bench( + group: &mut BenchmarkGroup, + bench_prefix: &str, + size: usize, + rows: &Vec, + null_density: f32, + input: &ArrayRef, +) { + // vectorized_append + let function_name = format!("{bench_prefix}_null_{null_density:.1}_size_{size}"); + let id = BenchmarkId::new(&function_name, "vectorized_append"); + group.bench_function(id, |b| { + b.iter(|| { + let mut builder = ByteViewGroupValueBuilder::::new(); + builder.vectorized_append(input, rows).unwrap(); + }); + }); + + // append_val + let id = BenchmarkId::new(&function_name, "append_val"); + group.bench_function(id, |b| { + b.iter(|| { + let mut builder = ByteViewGroupValueBuilder::::new(); + for &i in rows { + builder.append_val(input, i).unwrap(); + } + }); + }); + + // vectorized_equal_to + vectorized_equal_to( + group, + ByteViewGroupValueBuilder::::new(), + &function_name, + rows, + input, + "all_true", + vec![true; size], + ); + vectorized_equal_to( + group, + ByteViewGroupValueBuilder::::new(), + &function_name, + rows, + input, + "0.75 true", + { + let mut rng = seedable_rng(); + let d = Bernoulli::new(0.75).unwrap(); + (0..size).map(|_| d.sample(&mut rng)).collect::>() + }, + ); + vectorized_equal_to( + group, + ByteViewGroupValueBuilder::::new(), + &function_name, + rows, + input, + "0.5 true", + { + let mut rng = seedable_rng(); + let d = Bernoulli::new(0.5).unwrap(); + (0..size).map(|_| d.sample(&mut rng)).collect::>() + }, + ); + vectorized_equal_to( + group, + ByteViewGroupValueBuilder::::new(), + &function_name, + rows, + input, + "0.25 true", + { + let mut rng = seedable_rng(); + let d = Bernoulli::new(0.25).unwrap(); + (0..size).map(|_| d.sample(&mut rng)).collect::>() + }, + ); + // Not adding 0 true case here as if we optimize for 0 true cases the caller should avoid calling this method at all +} + +fn primitive_vectorized_append(c: &mut Criterion) { + let mut group = c.benchmark_group("PrimitiveGroupValueBuilder_vectorized_append"); + + for &size in &SIZES { + let rows: Vec = (0..size).collect(); + + for &null_density in &NULL_DENSITIES { + if null_density == 0.0 { + bench_single_primitive::(&mut group, size, &rows, null_density) + } + bench_single_primitive::(&mut group, size, &rows, null_density); + } + } + + group.finish(); +} + +fn bench_single_primitive( + group: &mut BenchmarkGroup, + size: usize, + rows: &Vec, + null_density: f32, +) { + if !NULLABLE { + assert_eq!( + null_density, 0.0, + "non-nullable case must have null_density 0" + ); + } + + let input = create_primitive_array::(size, null_density); + let input: ArrayRef = Arc::new(input); + let function_name = format!("null_{null_density:.1}_nullable_{NULLABLE}_size_{size}"); + + // vectorized_append + let id = BenchmarkId::new(&function_name, "vectorized_append"); + group.bench_function(id, |b| { + b.iter(|| { + let mut builder = + PrimitiveGroupValueBuilder::::new(DataType::Int32); + builder.vectorized_append(&input, rows).unwrap(); + }); + }); + + // append_val + let id = BenchmarkId::new(&function_name, "append_val"); + group.bench_function(id, |b| { + b.iter(|| { + let mut builder = + PrimitiveGroupValueBuilder::::new(DataType::Int32); + for &i in rows { + builder.append_val(&input, i).unwrap(); + } + }); + }); + + // vectorized_equal_to + vectorized_equal_to( + group, + PrimitiveGroupValueBuilder::::new(DataType::Int32), + &function_name, + rows, + &input, + "all_true", + vec![true; size], + ); + vectorized_equal_to( + group, + PrimitiveGroupValueBuilder::::new(DataType::Int32), + &function_name, + rows, + &input, + "0.75 true", + { + let mut rng = seedable_rng(); + let d = Bernoulli::new(0.75).unwrap(); + (0..size).map(|_| d.sample(&mut rng)).collect::>() + }, + ); + vectorized_equal_to( + group, + PrimitiveGroupValueBuilder::::new(DataType::Int32), + &function_name, + rows, + &input, + "0.5 true", + { + let mut rng = seedable_rng(); + let d = Bernoulli::new(0.5).unwrap(); + (0..size).map(|_| d.sample(&mut rng)).collect::>() + }, + ); + vectorized_equal_to( + group, + PrimitiveGroupValueBuilder::::new(DataType::Int32), + &function_name, + rows, + &input, + "0.25 true", + { + let mut rng = seedable_rng(); + let d = Bernoulli::new(0.25).unwrap(); + (0..size).map(|_| d.sample(&mut rng)).collect::>() + }, + ); + // Not adding 0 true case here as if we optimize for 0 true cases the caller should avoid calling this method at all +} + +/// Test `vectorized_equal_to` with different number of true in the initial results +#[expect(clippy::needless_pass_by_value)] +fn vectorized_equal_to( + group: &mut BenchmarkGroup, + mut builder: GroupColumnBuilder, + function_name: &str, + rows: &[usize], + input: &ArrayRef, + equal_to_result_description: &str, + equal_to_results: Vec, +) { + let id = BenchmarkId::new( + function_name, + format!("vectorized_equal_to_{equal_to_result_description}"), + ); + group.bench_function(id, |b| { + builder.vectorized_append(input, rows).unwrap(); + + b.iter(|| { + // Cloning is a must as `vectorized_equal_to` will modify the input vec + // and without cloning all benchmarks after the first one won't be meaningful + let mut equal_to_results = equal_to_results.clone(); + builder.vectorized_equal_to(rows, input, rows, &mut equal_to_results); + + // Make sure that the compiler does not optimize away the call + black_box(equal_to_results); + }); + }); +} + +criterion_group!(benches, bench_repartition); +criterion_main!(benches); diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index 2f3b1a19e7d73..f01f4eacd7313 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -25,7 +25,7 @@ use arrow::array::types::{ use arrow::array::{ArrayRef, downcast_primitive}; use arrow::datatypes::{DataType, SchemaRef, TimeUnit}; use datafusion_common::Result; - +use datafusion_execution::TaskContext; use datafusion_expr::EmitTo; pub mod multi_group_by; @@ -134,6 +134,7 @@ pub trait GroupValues: Send { pub fn new_group_values( schema: SchemaRef, group_ordering: &GroupOrdering, + ctx: Option<&TaskContext> ) -> Result> { if schema.fields.len() == 1 { let d = schema.fields[0].data_type(); @@ -200,11 +201,11 @@ pub fn new_group_values( } } - if multi_group_by::supported_schema(schema.as_ref()) { + if schema.fields.len() <= 4 && multi_group_by::supported_schema(schema.as_ref()) { if matches!(group_ordering, GroupOrdering::None) { - Ok(Box::new(GroupValuesColumn::::try_new(schema)?)) + Ok(Box::new(GroupValuesColumn::::try_new(schema, ctx)?)) } else { - Ok(Box::new(GroupValuesColumn::::try_new(schema)?)) + Ok(Box::new(GroupValuesColumn::::try_new(schema, ctx)?)) } } else { Ok(Box::new(GroupValuesRows::try_new(schema)?)) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index 479bff001e3c8..8f29df21f2124 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -23,14 +23,14 @@ pub mod bytes_view; pub mod primitive; use std::mem::{self, size_of}; - +use std::sync::LazyLock; use crate::aggregates::group_values::GroupValues; use crate::aggregates::group_values::multi_group_by::{ boolean::BooleanGroupValueBuilder, bytes::ByteGroupValueBuilder, bytes_view::ByteViewGroupValueBuilder, primitive::PrimitiveGroupValueBuilder, }; use ahash::RandomState; -use arrow::array::{Array, ArrayRef}; +use arrow::array::{Array, ArrayRef, RecordBatch}; use arrow::compute::cast; use arrow::datatypes::{ BinaryViewType, DataType, Date32Type, Date64Type, Decimal128Type, Float32Type, @@ -47,10 +47,40 @@ use datafusion_expr::EmitTo; use datafusion_physical_expr::binary_map::OutputType; use hashbrown::hash_table::HashTable; +use datafusion_execution::TaskContext; const NON_INLINED_FLAG: u64 = 0x8000000000000000; const VALUE_MASK: u64 = 0x7FFFFFFFFFFFFFFF; +// For mac 65536 +fn l1_cache_size() -> usize { + static SIZE: LazyLock = LazyLock::new(|| cache_size::l1_cache_size().unwrap_or( + // 32KB + 32 * 1024 + )); + + *SIZE +} +// for mac 4194304 +fn l2_cache_size() -> usize { + static SIZE: LazyLock = LazyLock::new(|| cache_size::l2_cache_size().unwrap_or( + // 1MB + 1024 * 1024 + )); + + *SIZE +} + +// No L3 for mac so maybe 4194304 * 100 to have as threshold? +fn l3_cache_size() -> usize { + static SIZE: LazyLock = LazyLock::new(|| cache_size::l3_cache_size().unwrap_or( + // 12MB + 12 * 1024 * 1024 + )); + + *SIZE +} + /// Trait for storing a single column of group values in [`GroupValuesColumn`] /// /// Implementations of this trait store an in-progress collection of group values @@ -220,6 +250,10 @@ pub struct GroupValuesColumn { /// Random state for creating hashes random_state: RandomState, + + agg_prefetch_elements: usize, + agg_prefetch_locality: usize, + agg_prefetch_read: bool, } /// Buffers to store intermediate results in `vectorized_append` @@ -260,7 +294,7 @@ impl GroupValuesColumn { // ======================================================================== /// Create a new instance of GroupValuesColumn if supported for the specified schema - pub fn try_new(schema: SchemaRef) -> Result { + pub fn try_new(schema: SchemaRef, ctx: Option<&TaskContext>) -> Result { let map = HashTable::with_capacity(0); Ok(Self { schema, @@ -272,6 +306,9 @@ impl GroupValuesColumn { group_values: vec![], hashes_buffer: Default::default(), random_state: crate::aggregates::AGGREGATION_HASH_SEED, + agg_prefetch_elements: ctx.map(|c| c.session_config().options().execution.agg_prefetch_elements).unwrap_or(0), + agg_prefetch_locality: ctx.map(|c| c.session_config().options().execution.agg_prefetch_locality).unwrap_or(3), + agg_prefetch_read: ctx.map(|c| c.session_config().options().execution.agg_prefetch_read).unwrap_or(true), }) } @@ -491,74 +528,168 @@ impl GroupValuesColumn { batch_hashes: &[u64], groups: &mut [usize], ) { + // ➜ ~ sysctl hw.l1dcachesize + // L1 cache size: 65536 + // L2 cache size: 4194304 + + let should_prefetch = if self.agg_prefetch_locality == 1 { + self.agg_prefetch_elements > 0 && self.map_size >= l1_cache_size() + } else if self.agg_prefetch_locality == 2 { + self.agg_prefetch_elements > 0 && self.map_size >= l2_cache_size() + } else if self.agg_prefetch_locality == 3 { + self.agg_prefetch_elements > 0 && self.map_size >= l3_cache_size() + } else { + self.agg_prefetch_elements > 0 + }; + + if !should_prefetch { + self.collect_vectorized_process_context_with_prefetch::<0>(batch_hashes, groups); + return; + } + + match self.agg_prefetch_elements { + 0 => self.collect_vectorized_process_context_with_prefetch::<0>(batch_hashes, groups), + 1 => self.collect_vectorized_process_context_with_prefetch::<1>(batch_hashes, groups), + 2 => self.collect_vectorized_process_context_with_prefetch::<2>(batch_hashes, groups), + 3 => self.collect_vectorized_process_context_with_prefetch::<3>(batch_hashes, groups), + 4 => self.collect_vectorized_process_context_with_prefetch::<4>(batch_hashes, groups), + 5 => self.collect_vectorized_process_context_with_prefetch::<5>(batch_hashes, groups), + 6 => self.collect_vectorized_process_context_with_prefetch::<6>(batch_hashes, groups), + 7 => self.collect_vectorized_process_context_with_prefetch::<7>(batch_hashes, groups), + 8 => self.collect_vectorized_process_context_with_prefetch::<8>(batch_hashes, groups), + _ => self.collect_vectorized_process_context_with_prefetch::<8>(batch_hashes, groups), + } + } + + fn collect_vectorized_process_context_with_prefetch( + &mut self, + batch_hashes: &[u64], + groups: &mut [usize], + ) { + if self.agg_prefetch_read { + self.collect_vectorized_process_context_with_prefetch_and_read::(batch_hashes, groups) + } else { + self.collect_vectorized_process_context_with_prefetch_and_read::(batch_hashes, groups) + } + } + + fn collect_vectorized_process_context_with_prefetch_and_read( + &mut self, + batch_hashes: &[u64], + groups: &mut [usize], + ) { + match self.agg_prefetch_locality { + 0 => self.collect_vectorized_process_context_with_prefetch_and_read_and_locality::(batch_hashes, groups), + 1 => self.collect_vectorized_process_context_with_prefetch_and_read_and_locality::(batch_hashes, groups), + 2 => self.collect_vectorized_process_context_with_prefetch_and_read_and_locality::(batch_hashes, groups), + 3 => self.collect_vectorized_process_context_with_prefetch_and_read_and_locality::(batch_hashes, groups), + _ => self.collect_vectorized_process_context_with_prefetch_and_read_and_locality::(batch_hashes, groups), + } + } + fn collect_vectorized_process_context_with_prefetch_and_read_and_locality( + &mut self, + batch_hashes: &[u64], + groups: &mut [usize], + ) { self.vectorized_operation_buffers.append_row_indices.clear(); self.vectorized_operation_buffers - .equal_to_row_indices - .clear(); + .equal_to_row_indices + .clear(); self.vectorized_operation_buffers - .equal_to_group_indices - .clear(); + .equal_to_group_indices + .clear(); + + if PREFETCH > 0 { + self.map.reserve(batch_hashes.len(), |(hash, _)| *hash); + let mut group_values_len = self.group_values[0].len(); + if batch_hashes.is_empty() { + return; + } - let mut group_values_len = self.group_values[0].len(); - for (row, &target_hash) in batch_hashes.iter().enumerate() { - let entry = self - .map - .find(target_hash, |(exist_hash, _)| target_hash == *exist_hash); + let mut batch_hashes_iter = batch_hashes[0..batch_hashes.len().saturating_sub(PREFETCH)].iter().enumerate(); - let Some((_, group_index_view)) = entry else { - // 1. Bucket not found case - // Build `new inlined group index view` - let current_group_idx = group_values_len; - let group_index_view = - GroupIndexView::new_inlined(current_group_idx as u64); - - // Insert the `group index view` and its hash into `map` - // for hasher function, use precomputed hash value - self.map.insert_accounted( - (target_hash, group_index_view), - |(hash, _)| *hash, - &mut self.map_size, - ); - - // Add row index to `vectorized_append_row_indices` - self.vectorized_operation_buffers - .append_row_indices - .push(row); - - // Set group index to row in `groups` - groups[row] = current_group_idx; - - group_values_len += 1; - continue; - }; + for (row, &target_hash) in batch_hashes_iter { + // prefetch next item + for i in 1..=PREFETCH { + if READ { + self.map.prefetch_read::(batch_hashes[row + i]); + } else { + self.map.prefetch_write::(batch_hashes[row + i]); + } + } + self.insert_entry(groups, &mut group_values_len, row, target_hash); + } + for index in batch_hashes.len().saturating_sub(PREFETCH)..batch_hashes.len() { + self.insert_entry(groups, &mut group_values_len, index, batch_hashes[index]); + } + } else { + let mut group_values_len = self.group_values[0].len(); - // 2. bucket found - // Check if the `group index view` is `inlined` or `non_inlined` - if group_index_view.is_non_inlined() { - // Non-inlined case, the value of view is offset in `group_index_lists`. - // We use it to get `group_index_list`, and add related `rows` and `group_indices` - // into `vectorized_equal_to_row_indices` and `vectorized_equal_to_group_indices`. - let list_offset = group_index_view.value() as usize; - let group_index_list = &self.group_index_lists[list_offset]; - - self.vectorized_operation_buffers - .equal_to_group_indices - .extend_from_slice(group_index_list); - self.vectorized_operation_buffers - .equal_to_row_indices - .extend(std::iter::repeat_n(row, group_index_list.len())); - } else { - let group_index = group_index_view.value() as usize; - self.vectorized_operation_buffers - .equal_to_row_indices - .push(row); - self.vectorized_operation_buffers - .equal_to_group_indices - .push(group_index); + for (row, &target_hash) in batch_hashes.iter().enumerate() { + self.insert_entry(groups, &mut group_values_len, row, target_hash); } } } + #[inline(always)] + fn insert_entry(&mut self, groups: &mut [usize], group_values_len: &mut usize, row: usize, target_hash: u64) { + let entry = self + .map + .find(target_hash, |(exist_hash, _)| target_hash == *exist_hash); + + let Some((_, group_index_view)) = entry else { + // 1. Bucket not found case + // Build `new inlined group index view` + let current_group_idx = *group_values_len; + let group_index_view = + GroupIndexView::new_inlined(current_group_idx as u64); + + // Insert the `group index view` and its hash into `map` + // for hasher function, use precomputed hash value + self.map.insert_accounted( + (target_hash, group_index_view), + |(hash, _)| *hash, + &mut self.map_size, + ); + + // Add row index to `vectorized_append_row_indices` + self.vectorized_operation_buffers + .append_row_indices + .push(row); + + // Set group index to row in `groups` + groups[row] = current_group_idx; + + *group_values_len += 1; + return; + }; + + // 2. bucket found + // Check if the `group index view` is `inlined` or `non_inlined` + if group_index_view.is_non_inlined() { + // Non-inlined case, the value of view is offset in `group_index_lists`. + // We use it to get `group_index_list`, and add related `rows` and `group_indices` + // into `vectorized_equal_to_row_indices` and `vectorized_equal_to_group_indices`. + let list_offset = group_index_view.value() as usize; + let group_index_list = &self.group_index_lists[list_offset]; + + self.vectorized_operation_buffers + .equal_to_group_indices + .extend_from_slice(group_index_list); + self.vectorized_operation_buffers + .equal_to_row_indices + .extend(std::iter::repeat_n(row, group_index_list.len())); + } else { + let group_index = group_index_view.value() as usize; + self.vectorized_operation_buffers + .equal_to_row_indices + .push(row); + self.vectorized_operation_buffers + .equal_to_group_indices + .push(group_index); + } + } + /// Perform `vectorized_append`` for `rows` in `vectorized_append_row_indices` fn vectorized_append(&mut self, cols: &[ArrayRef]) -> Result<()> { if self diff --git a/datafusion/physical-plan/src/aggregates/group_values/row.rs b/datafusion/physical-plan/src/aggregates/group_values/row.rs index dd794c957350d..bfc262c313ea7 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/row.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/row.rs @@ -22,10 +22,10 @@ use arrow::compute::cast; use arrow::datatypes::{DataType, SchemaRef}; use arrow::row::{RowConverter, Rows, SortField}; use datafusion_common::Result; -use datafusion_common::hash_utils::create_hashes; +use datafusion_common::hash_utils::{create_hashes, create_hashes_rows}; use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt}; use datafusion_expr::EmitTo; -use hashbrown::hash_table::HashTable; +use hashbrown::hash_table::{Entry, HashTable}; use log::debug; use std::mem::size_of; use std::sync::Arc; diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index cb22fbf9a06a1..8fb8624c58df0 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -17,6 +17,7 @@ //! Hash aggregation +use std::ops::Deref; use std::sync::Arc; use std::task::{Context, Poll}; use std::vec; @@ -590,7 +591,7 @@ impl GroupedHashAggregateStream { _ => OutOfMemoryMode::ReportError, }; - let group_values = new_group_values(group_schema, &group_ordering)?; + let group_values = new_group_values(group_schema, &group_ordering, Some(context.deref()))?; let reservation = MemoryConsumer::new(name) // We interpret 'can spill' as 'can handle memory back pressure'. // This value needs to be set to true for the default memory pool implementations diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 3e7c75b0c8e85..7531ea0c6ca35 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -456,7 +456,7 @@ struct DistinctDeduplicator { impl DistinctDeduplicator { fn new(schema: SchemaRef, task_context: &TaskContext) -> Result { - let group_values = new_group_values(schema, &GroupOrdering::None)?; + let group_values = new_group_values(schema, &GroupOrdering::None, Some(task_context))?; let reservation = MemoryConsumer::new("RecursiveQueryHashTable") .register(task_context.memory_pool()); Ok(Self { From 3c646107f1cbd53497fb92a4d148f316b8c9c5db Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 28 Dec 2025 13:41:16 +0200 Subject: [PATCH 3/4] change defaults --- datafusion/common/src/config.rs | 4 ++-- .../src/aggregates/group_values/multi_group_by/mod.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 24dff1aedd28e..dc2b2290dbe87 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -650,9 +650,9 @@ config_namespace! { /// `false` — ANSI SQL mode is disabled by default. pub enable_ansi_mode: bool, default = false - pub agg_prefetch_elements: usize, default = 0 + pub agg_prefetch_elements: usize, default = 1 pub agg_prefetch_locality: usize, default = 3 - pub agg_prefetch_read: bool, default = true + pub agg_prefetch_read: bool, default = false } } diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index 8f29df21f2124..37a1abfb5928c 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -306,9 +306,9 @@ impl GroupValuesColumn { group_values: vec![], hashes_buffer: Default::default(), random_state: crate::aggregates::AGGREGATION_HASH_SEED, - agg_prefetch_elements: ctx.map(|c| c.session_config().options().execution.agg_prefetch_elements).unwrap_or(0), + agg_prefetch_elements: ctx.map(|c| c.session_config().options().execution.agg_prefetch_elements).unwrap_or(1), agg_prefetch_locality: ctx.map(|c| c.session_config().options().execution.agg_prefetch_locality).unwrap_or(3), - agg_prefetch_read: ctx.map(|c| c.session_config().options().execution.agg_prefetch_read).unwrap_or(true), + agg_prefetch_read: ctx.map(|c| c.session_config().options().execution.agg_prefetch_read).unwrap_or(false), }) } From b2c912543893b852a45bf95a9db3443973f1877c Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 28 Dec 2025 13:42:15 +0200 Subject: [PATCH 4/4] revert unrelated --- datafusion/common/src/hash_utils.rs | 47 +- .../core/benches/aggregate_query_sql.rs | 474 +++++++----------- datafusion/core/benches/data_utils/mod.rs | 172 +------ .../physical-plan/benches/repartition.rs | 329 ------------ .../src/aggregates/group_values/mod.rs | 2 +- .../src/aggregates/group_values/row.rs | 4 +- 6 files changed, 206 insertions(+), 822 deletions(-) delete mode 100644 datafusion/physical-plan/benches/repartition.rs diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 39ca73a71ecde..98dd1f235aee7 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -33,7 +33,6 @@ use crate::cast::{ use crate::error::Result; use crate::error::{_internal_datafusion_err, _internal_err}; use std::cell::RefCell; -use arrow::row::Rows; // Combines two hashes into one hash #[inline] @@ -803,8 +802,8 @@ pub fn create_hashes<'a, I, T>( hashes_buffer: &'a mut [u64], ) -> Result<&'a mut [u64]> where - I: IntoIterator, - T: AsDynArray, + I: IntoIterator, + T: AsDynArray, { for (i, array) in arrays.into_iter().enumerate() { // combine hashes with `combine_hashes` for all columns besides the first @@ -814,48 +813,6 @@ where Ok(hashes_buffer) } -/// Creates hash values for every row, based on the values in the columns. -/// -/// The number of rows to hash is determined by `hashes_buffer.len()`. -/// `hashes_buffer` should be pre-sized appropriately. -pub fn create_hashes_rows<'a>( - rows: arrow::row::RowsIter<'_>, - random_state: &RandomState, - hashes_buffer: &'a mut [u64], -) -> Result<&'a mut [u64]> { - hash_rows(rows, random_state, hashes_buffer); - Ok(hashes_buffer) -} - -#[cfg(not(feature = "force_hash_collisions"))] -fn hash_rows<'a>( - rows: arrow::row::RowsIter<'_>, - random_state: &RandomState, - hashes_buffer: &'a mut [u64] -) { - assert_eq!( - hashes_buffer.len(), - rows.len(), - "hashes_buffer and array should be of equal length" - ); - for (hash, row) in hashes_buffer.iter_mut().zip(rows) { - *hash = row.data().hash_one(random_state); - } -} - -/// Test version of `hash_rows` that forces all hashes to collide to zero. -#[cfg(feature = "force_hash_collisions")] -fn hash_rows( - _rows: arrow::row::RowsIter<'_>, - _random_state: &RandomState, - hashes_buffer: &'a mut [u64] -) -> Result<()> { - for hash in hashes_buffer.iter_mut() { - *hash = 0 - } - Ok(()) -} - #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/datafusion/core/benches/aggregate_query_sql.rs b/datafusion/core/benches/aggregate_query_sql.rs index 20cd47266e4e5..4aa667504e459 100644 --- a/datafusion/core/benches/aggregate_query_sql.rs +++ b/datafusion/core/benches/aggregate_query_sql.rs @@ -29,10 +29,7 @@ use datafusion::execution::context::SessionContext; use parking_lot::Mutex; use std::hint::black_box; use std::sync::Arc; -use arrow_schema::{Schema, SchemaRef}; -use itertools::Itertools; use tokio::runtime::Runtime; -use crate::data_utils::{create_large_non_nested_schema, create_large_with_nested_schema, create_nested_schema, create_table_provider_from_schema}; #[expect(clippy::needless_pass_by_value)] fn query(ctx: Arc>, rt: &Runtime, sql: &str) { @@ -51,19 +48,6 @@ fn create_context( Ok(Arc::new(Mutex::new(ctx))) } -fn create_context_with_random_data_for_schema( - partitions_len: usize, - array_len: usize, - batch_size: usize, - schema: SchemaRef, -) -> Result>> { - let ctx = SessionContext::new(); - let provider = create_table_provider_from_schema(partitions_len, array_len, batch_size, schema)?; - ctx.register_table("t", provider)?; - Ok(Arc::new(Mutex::new(ctx))) -} - - fn criterion_benchmark(c: &mut Criterion) { let partitions_len = 8; let array_len = 32768 * 2; // 2^16 @@ -71,275 +55,207 @@ fn criterion_benchmark(c: &mut Criterion) { let ctx = create_context(partitions_len, array_len, batch_size).unwrap(); let rt = Runtime::new().unwrap(); - let nested_ctx = create_context_with_random_data_for_schema(1, array_len, batch_size, create_nested_schema()).unwrap(); - let nested_rt = Runtime::new().unwrap(); + c.bench_function("aggregate_query_no_group_by 15 12", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + "SELECT MIN(f64), AVG(f64), COUNT(f64) \ + FROM t", + ) + }) + }); - let large_non_nested_schema_ctx = create_context_with_random_data_for_schema(1, array_len, batch_size, create_large_non_nested_schema()).unwrap(); - let large_non_nested_schema_rt = Runtime::new().unwrap(); + c.bench_function("aggregate_query_no_group_by_min_max_f64", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + "SELECT MIN(f64), MAX(f64) \ + FROM t", + ) + }) + }); - let large_with_nested_schema_ctx = create_context_with_random_data_for_schema(1, array_len, batch_size, create_large_with_nested_schema()).unwrap(); - let large_with_nested_schema_rt = Runtime::new().unwrap(); - // - // c.bench_function("aggregate_query_no_group_by 15 12", |b| { - // b.iter(|| { - // query( - // ctx.clone(), - // &rt, - // "SELECT MIN(f64), AVG(f64), COUNT(f64) \ - // FROM t", - // ) - // }) - // }); - // - // c.bench_function("aggregate_query_no_group_by_min_max_f64", |b| { - // b.iter(|| { - // query( - // ctx.clone(), - // &rt, - // "SELECT MIN(f64), MAX(f64) \ - // FROM t", - // ) - // }) - // }); - // - // c.bench_function("aggregate_query_no_group_by_count_distinct_wide", |b| { - // b.iter(|| { - // query( - // ctx.clone(), - // &rt, - // "SELECT COUNT(DISTINCT u64_wide) \ - // FROM t", - // ) - // }) - // }); - // - // c.bench_function("aggregate_query_no_group_by_count_distinct_narrow", |b| { - // b.iter(|| { - // query( - // ctx.clone(), - // &rt, - // "SELECT COUNT(DISTINCT u64_narrow) \ - // FROM t", - // ) - // }) - // }); - // - // c.bench_function("aggregate_query_group_by", |b| { - // b.iter(|| { - // query( - // ctx.clone(), - // &rt, - // "SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) \ - // FROM t GROUP BY utf8", - // ) - // }) - // }); - // - // c.bench_function("aggregate_query_group_by_with_filter", |b| { - // b.iter(|| { - // query( - // ctx.clone(), - // &rt, - // "SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) \ - // FROM t \ - // WHERE f32 > 10 AND f32 < 20 GROUP BY utf8", - // ) - // }) - // }); - // - // c.bench_function("aggregate_query_group_by_u64 15 12", |b| { - // b.iter(|| { - // query( - // ctx.clone(), - // &rt, - // "SELECT u64_narrow, MIN(f64), AVG(f64), COUNT(f64) \ - // FROM t GROUP BY u64_narrow", - // ) - // }) - // }); - // - // c.bench_function("aggregate_query_group_by_with_filter_u64 15 12", |b| { - // b.iter(|| { - // query( - // ctx.clone(), - // &rt, - // "SELECT u64_narrow, MIN(f64), AVG(f64), COUNT(f64) \ - // FROM t \ - // WHERE f32 > 10 AND f32 < 20 GROUP BY u64_narrow", - // ) - // }) - // }); - // - // c.bench_function("aggregate_query_group_by_u64_multiple_keys", |b| { - // b.iter(|| { - // query( - // ctx.clone(), - // &rt, - // "SELECT u64_wide, utf8, MIN(f64), AVG(f64), COUNT(f64) \ - // FROM t GROUP BY u64_wide, utf8", - // ) - // }) - // }); - // - // c.bench_function( - // "aggregate_query_group_by_wide_u64_and_string_without_aggregate_expressions", - // |b| { - // b.iter(|| { - // query( - // ctx.clone(), - // &rt, - // // Due to the large number of distinct values in u64_wide, - // // this query test the actual grouping performance for more than 1 column - // "SELECT u64_wide, utf8 \ - // FROM t GROUP BY u64_wide, utf8", - // ) - // }) - // }, - // ); - // - // c.bench_function( - // "aggregate_query_group_by_wide_u64_and_f32_without_aggregate_expressions", - // |b| { - // b.iter(|| { - // query( - // ctx.clone(), - // &rt, - // // Due to the large number of distinct values in u64_wide, - // // this query test the actual grouping performance for more than 1 column - // "SELECT u64_wide, f32 \ - // FROM t GROUP BY u64_wide, f32", - // ) - // }) - // }, - // ); + c.bench_function("aggregate_query_no_group_by_count_distinct_wide", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + "SELECT COUNT(DISTINCT u64_wide) \ + FROM t", + ) + }) + }); - { - let column_names = get_all_columns_in_schema(&create_large_non_nested_schema()); - let sql = format!("SELECT {column_names} FROM t GROUP BY {column_names}"); - c.bench_function( - "aggregate_query_on_large_non_nested_schema", - |b| { - b.iter(|| { - query( - large_non_nested_schema_ctx.clone(), - &large_non_nested_schema_rt, - sql.as_str() - ) - }) - }, - ); - } + c.bench_function("aggregate_query_no_group_by_count_distinct_narrow", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + "SELECT COUNT(DISTINCT u64_narrow) \ + FROM t", + ) + }) + }); - { - let column_names = get_all_columns_in_schema(&create_large_with_nested_schema()); - let sql = format!("SELECT {column_names} FROM t GROUP BY {column_names}"); - c.bench_function( - "aggregate_query_on_large_with_nested_schema", - |b| { - b.iter(|| { - query( - large_with_nested_schema_ctx.clone(), - &large_with_nested_schema_rt, - sql.as_str() - ) - }) - }, - ); - } + c.bench_function("aggregate_query_group_by", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + "SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) \ + FROM t GROUP BY utf8", + ) + }) + }); - { - let column_names = get_all_columns_in_schema(&create_nested_schema()); - let sql = format!("SELECT {column_names} FROM t GROUP BY {column_names}"); - c.bench_function( - "aggregate_query_on_complex_schema", - |b| { - b.iter(|| { - query( - nested_ctx.clone(), - &nested_rt, - sql.as_str() - ) - }) - }, - ); - } - // - // c.bench_function("aggregate_query_approx_percentile_cont_on_u64", |b| { - // b.iter(|| { - // query( - // ctx.clone(), - // &rt, - // "SELECT utf8, approx_percentile_cont(0.5, 2500) WITHIN GROUP (ORDER BY u64_wide) \ - // FROM t GROUP BY utf8", - // ) - // }) - // }); - // - // c.bench_function("aggregate_query_approx_percentile_cont_on_f32", |b| { - // b.iter(|| { - // query( - // ctx.clone(), - // &rt, - // "SELECT utf8, approx_percentile_cont(0.5, 2500) WITHIN GROUP (ORDER BY f32) \ - // FROM t GROUP BY utf8", - // ) - // }) - // }); - // - // c.bench_function("aggregate_query_distinct_median", |b| { - // b.iter(|| { - // query( - // ctx.clone(), - // &rt, - // "SELECT MEDIAN(DISTINCT u64_wide), MEDIAN(DISTINCT u64_narrow) \ - // FROM t", - // ) - // }) - // }); - // - // c.bench_function("first_last_many_columns", |b| { - // b.iter(|| { - // query( - // ctx.clone(), - // &rt, - // "SELECT first_value(u64_wide order by f64, u64_narrow, utf8),\ - // last_value(u64_wide order by f64, u64_narrow, utf8) \ - // FROM t GROUP BY u64_narrow", - // ) - // }) - // }); - // - // c.bench_function("first_last_ignore_nulls", |b| { - // b.iter(|| { - // query( - // ctx.clone(), - // &rt, - // "SELECT first_value(u64_wide ignore nulls order by f64, u64_narrow, utf8), \ - // last_value(u64_wide ignore nulls order by f64, u64_narrow, utf8) \ - // FROM t GROUP BY u64_narrow", - // ) - // }) - // }); - // - // c.bench_function("first_last_one_column", |b| { - // b.iter(|| { - // query( - // ctx.clone(), - // &rt, - // "SELECT first_value(u64_wide order by f64), \ - // last_value(u64_wide order by f64) \ - // FROM t GROUP BY u64_narrow", - // ) - // }) - // }); -} + c.bench_function("aggregate_query_group_by_with_filter", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + "SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) \ + FROM t \ + WHERE f32 > 10 AND f32 < 20 GROUP BY utf8", + ) + }) + }); + + c.bench_function("aggregate_query_group_by_u64 15 12", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + "SELECT u64_narrow, MIN(f64), AVG(f64), COUNT(f64) \ + FROM t GROUP BY u64_narrow", + ) + }) + }); + + c.bench_function("aggregate_query_group_by_with_filter_u64 15 12", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + "SELECT u64_narrow, MIN(f64), AVG(f64), COUNT(f64) \ + FROM t \ + WHERE f32 > 10 AND f32 < 20 GROUP BY u64_narrow", + ) + }) + }); + + c.bench_function("aggregate_query_group_by_u64_multiple_keys", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + "SELECT u64_wide, utf8, MIN(f64), AVG(f64), COUNT(f64) \ + FROM t GROUP BY u64_wide, utf8", + ) + }) + }); + + c.bench_function( + "aggregate_query_group_by_wide_u64_and_string_without_aggregate_expressions", + |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + // Due to the large number of distinct values in u64_wide, + // this query test the actual grouping performance for more than 1 column + "SELECT u64_wide, utf8 \ + FROM t GROUP BY u64_wide, utf8", + ) + }) + }, + ); + + c.bench_function( + "aggregate_query_group_by_wide_u64_and_f32_without_aggregate_expressions", + |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + // Due to the large number of distinct values in u64_wide, + // this query test the actual grouping performance for more than 1 column + "SELECT u64_wide, f32 \ + FROM t GROUP BY u64_wide, f32", + ) + }) + }, + ); + + c.bench_function("aggregate_query_approx_percentile_cont_on_u64", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + "SELECT utf8, approx_percentile_cont(0.5, 2500) WITHIN GROUP (ORDER BY u64_wide) \ + FROM t GROUP BY utf8", + ) + }) + }); + + c.bench_function("aggregate_query_approx_percentile_cont_on_f32", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + "SELECT utf8, approx_percentile_cont(0.5, 2500) WITHIN GROUP (ORDER BY f32) \ + FROM t GROUP BY utf8", + ) + }) + }); + + c.bench_function("aggregate_query_distinct_median", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + "SELECT MEDIAN(DISTINCT u64_wide), MEDIAN(DISTINCT u64_narrow) \ + FROM t", + ) + }) + }); + + c.bench_function("first_last_many_columns", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + "SELECT first_value(u64_wide order by f64, u64_narrow, utf8),\ + last_value(u64_wide order by f64, u64_narrow, utf8) \ + FROM t GROUP BY u64_narrow", + ) + }) + }); + + c.bench_function("first_last_ignore_nulls", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + "SELECT first_value(u64_wide ignore nulls order by f64, u64_narrow, utf8), \ + last_value(u64_wide ignore nulls order by f64, u64_narrow, utf8) \ + FROM t GROUP BY u64_narrow", + ) + }) + }); -fn get_all_columns_in_schema(schema: &Schema) -> String { - schema - .fields() - .iter() - .map(|f| f.name()) - .join(", ") + c.bench_function("first_last_one_column", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + "SELECT first_value(u64_wide order by f64), \ + last_value(u64_wide order by f64) \ + FROM t GROUP BY u64_narrow", + ) + }) + }); } criterion_group!(benches, criterion_benchmark); diff --git a/datafusion/core/benches/data_utils/mod.rs b/datafusion/core/benches/data_utils/mod.rs index b1826411413b6..630bc056600b4 100644 --- a/datafusion/core/benches/data_utils/mod.rs +++ b/datafusion/core/benches/data_utils/mod.rs @@ -33,8 +33,6 @@ use rand_distr::Distribution; use rand_distr::{Normal, Pareto}; use std::fmt::Write; use std::sync::Arc; -use arrow::util::data_gen::create_random_batch; -use arrow_schema::Fields; /// create an in-memory table given the partition len, array len, and batch size, /// and the result table will be of array_len in total, and then partitioned, and batched. @@ -51,143 +49,6 @@ pub fn create_table_provider( MemTable::try_new(schema, partitions).map(Arc::new) } -pub fn create_table_provider_from_schema( - partitions_len: usize, - array_len: usize, - batch_size: usize, - schema: SchemaRef, -) -> Result> { - let partitions = - create_record_batches_from( - schema.clone(), - array_len, - partitions_len, - batch_size, - |schema, rng, batch_size, index| create_random_batch( - schema.clone(), - batch_size, - 0.1, - 0.5 - ).unwrap() - ); - // declare a table in memory. In spark API, this corresponds to createDataFrame(...). - MemTable::try_new(schema, partitions).map(Arc::new) -} - -pub fn create_nested_schema() -> SchemaRef { - let fields = vec![ - Field::new( - "_1", - DataType::Struct(Fields::from(vec![ - Field::new("_1", DataType::Int8, true), - Field::new( - "_2", - DataType::Struct(Fields::from(vec![ - Field::new("_1", DataType::Int8, true), - Field::new( - "_1", - DataType::Struct(Fields::from(vec![ - Field::new("_1", DataType::Int8, true), - Field::new("_2", DataType::Utf8, true), - ])), - true, - ), - Field::new("_2", DataType::UInt8, true), - ])), - true, - ), - ])), - true, - ), - Field::new( - "_2", - DataType::LargeList(Arc::new(Field::new_list_field( - DataType::List(Arc::new(Field::new_list_field( - DataType::Struct(Fields::from(vec![ - Field::new( - "_1", - DataType::Struct(Fields::from(vec![ - Field::new("_1", DataType::Int8, true), - Field::new("_2", DataType::Int16, true), - Field::new("_3", DataType::Int32, true), - ])), - true, - ), - Field::new( - "_2", - DataType::List(Arc::new(Field::new( - "", - DataType::FixedSizeBinary(2), - true, - ))), - true, - ), - ])), - true, - ))), - true, - ))), - true, - ), - ]; - Arc::new(Schema::new(fields)) - -} - - -/// Creating a schema that is supported by Multi group by implementation in aggregation -pub fn create_large_non_nested_schema() -> SchemaRef { - Arc::new(Schema::new(vec![ - Field::new("a1", DataType::Utf8, false), - Field::new("a2", DataType::LargeUtf8, false), - Field::new("a3", DataType::Utf8View, true), - Field::new("a4", DataType::Binary, true), - Field::new("a5", DataType::LargeBinary, true), - Field::new("a6", DataType::BinaryView, true), - Field::new("a7", DataType::Int8, false), - Field::new("a8", DataType::UInt8, false), - Field::new("a9", DataType::Int16, false), - Field::new("a10", DataType::UInt16, false), - Field::new("a11", DataType::Int32, false), - Field::new("a12", DataType::UInt32, false), - Field::new("a13", DataType::Int64, false), - Field::new("a14", DataType::UInt64, false), - // Field::new("a15", DataType::Decimal128(5, 2), false), - // Field::new("a16", DataType::Decimal256(5, 2), false), - Field::new("a17", DataType::Float32, false), - Field::new("a18", DataType::Float64, true), - Field::new("a19", DataType::Boolean, true), - ])) -} - -/// Creating a schema that is supported by Multi group by implementation in aggregation -pub fn create_large_with_nested_schema() -> SchemaRef { - Arc::new(Schema::new(vec![ - Field::new("a1", DataType::Utf8, false), - Field::new("a2", DataType::LargeUtf8, false), - Field::new("a3", DataType::Utf8View, true), - Field::new("a4", DataType::Binary, true), - Field::new("a5", DataType::LargeBinary, true), - Field::new("a6", DataType::BinaryView, true), - Field::new("a7", DataType::Int8, false), - Field::new("a8", DataType::UInt8, false), - Field::new("a9", DataType::Int16, false), - Field::new("a10", DataType::UInt16, false), - Field::new("a11", DataType::Int32, false), - Field::new("a12", DataType::UInt32, false), - Field::new("a13", DataType::Int64, false), - Field::new("a14", DataType::UInt64, false), - // Field::new("a15", DataType::Decimal128(5, 2), false), - // Field::new("a16", DataType::Decimal256(5, 2), false), - Field::new("a17", DataType::Float32, false), - Field::new("a18", DataType::Float64, true), - Field::new("a19", DataType::Boolean, true), - Field::new("a20", DataType::Struct(create_large_non_nested_schema().fields.clone()), true), - Field::new_list("a21", Field::new_list_field(DataType::Utf8, true), true), - ])) -} - - /// Create test data schema pub fn create_schema() -> Schema { Schema::new(vec![ @@ -284,36 +145,15 @@ pub fn create_record_batches( array_len: usize, partitions_len: usize, batch_size: usize, -) -> Vec> { - create_record_batches_from( - schema, - array_len, - partitions_len, - batch_size, - |schema, rng, batch_size, index| create_record_batch(schema.clone(), rng, batch_size, index) - ) -} - - - -/// Create record batches of `partitions_len` partitions and `batch_size` for each batch, -/// with a total number of `array_len` records -#[expect(clippy::needless_pass_by_value)] -fn create_record_batches_from( - schema: SchemaRef, - array_len: usize, - partitions_len: usize, - batch_size: usize, - record_batch_creator: fn(&SchemaRef, &mut StdRng, usize, usize) -> RecordBatch, ) -> Vec> { let mut rng = StdRng::seed_from_u64(42); (0..partitions_len) - .map(|_| { - (0..array_len / batch_size / partitions_len) - .map(|i| record_batch_creator(&schema, &mut rng, batch_size, i)) - .collect::>() - }) - .collect::>() + .map(|_| { + (0..array_len / batch_size / partitions_len) + .map(|i| create_record_batch(schema.clone(), &mut rng, batch_size, i)) + .collect::>() + }) + .collect::>() } /// An enum that wraps either a regular StringBuilder or a GenericByteViewBuilder diff --git a/datafusion/physical-plan/benches/repartition.rs b/datafusion/physical-plan/benches/repartition.rs deleted file mode 100644 index fca8e23e6133a..0000000000000 --- a/datafusion/physical-plan/benches/repartition.rs +++ /dev/null @@ -1,329 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::array::ArrayRef; -use arrow::datatypes::{Int32Type, StringViewType}; -use arrow::util::bench_util::{ - create_primitive_array, create_string_view_array_with_len, - create_string_view_array_with_max_len, -}; -use arrow::util::test_util::seedable_rng; -use arrow_schema::{DataType, Field, Schema, SchemaRef}; -use criterion::measurement::WallTime; -use criterion::{ - BenchmarkGroup, BenchmarkId, Criterion, criterion_group, criterion_main, -}; -use datafusion_physical_plan::aggregates::group_values::multi_group_by::GroupColumn; -use datafusion_physical_plan::aggregates::group_values::multi_group_by::bytes_view::ByteViewGroupValueBuilder; -use datafusion_physical_plan::aggregates::group_values::multi_group_by::primitive::PrimitiveGroupValueBuilder; -use rand::distr::{Bernoulli, Distribution}; -use std::hint::black_box; -use std::sync::Arc; -use datafusion_physical_plan::memory::LazyMemoryExec; -use datafusion_physical_plan::repartition::RepartitionExec; - -const SIZES: [usize; 3] = [1_000, 10_000, 100_000]; -const NULL_DENSITIES: [f32; 3] = [0.0, 0.1, 0.5]; - -fn bench_repartition(c: &mut Criterion) { - byte_view_vectorized_append(c); - primitive_vectorized_append(c); -} - -fn get_schema() -> SchemaRef { - // Large schema with many columns - - Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int64, false), - Field::new("b", DataType::Int32, false), - Field::new("c", DataType::Float32, false), - Field::new("d", DataType::Decimal128(5, 2), true), - Field::new("e", DataType::Binary, true), - Field::new("f", DataType::Utf8, true), - // Field::new_list("g", DataType::Utf8, true), - ])) -} - -fn get_infinite_stream() { - let a = LazyMemoryExec::try_new() -} - -fn bench_repartition_with(c: &mut Criterion) { - let source = LazyMemoryExec::try_new() - RepartitionExec::try_new() -} - -fn byte_view_vectorized_append(c: &mut Criterion) { - let mut group = c.benchmark_group("ByteViewGroupValueBuilder_vectorized_append"); - - for &size in &SIZES { - let rows: Vec = (0..size).collect(); - - for &null_density in &NULL_DENSITIES { - let input = create_string_view_array_with_len(size, null_density, 8, false); - let input: ArrayRef = Arc::new(input); - - bytes_bench(&mut group, "inline", size, &rows, null_density, &input); - } - } - - for &size in &SIZES { - let rows: Vec = (0..size).collect(); - - for &null_density in &NULL_DENSITIES { - let input = create_string_view_array_with_len(size, null_density, 64, true); - let input: ArrayRef = Arc::new(input); - - bytes_bench(&mut group, "scenario", size, &rows, null_density, &input); - } - } - - for &size in &SIZES { - let rows: Vec = (0..size).collect(); - - for &null_density in &NULL_DENSITIES { - let input = create_string_view_array_with_max_len(size, null_density, 400); - let input: ArrayRef = Arc::new(input); - - bytes_bench(&mut group, "random", size, &rows, null_density, &input); - } - } - - group.finish(); -} - -fn bytes_bench( - group: &mut BenchmarkGroup, - bench_prefix: &str, - size: usize, - rows: &Vec, - null_density: f32, - input: &ArrayRef, -) { - // vectorized_append - let function_name = format!("{bench_prefix}_null_{null_density:.1}_size_{size}"); - let id = BenchmarkId::new(&function_name, "vectorized_append"); - group.bench_function(id, |b| { - b.iter(|| { - let mut builder = ByteViewGroupValueBuilder::::new(); - builder.vectorized_append(input, rows).unwrap(); - }); - }); - - // append_val - let id = BenchmarkId::new(&function_name, "append_val"); - group.bench_function(id, |b| { - b.iter(|| { - let mut builder = ByteViewGroupValueBuilder::::new(); - for &i in rows { - builder.append_val(input, i).unwrap(); - } - }); - }); - - // vectorized_equal_to - vectorized_equal_to( - group, - ByteViewGroupValueBuilder::::new(), - &function_name, - rows, - input, - "all_true", - vec![true; size], - ); - vectorized_equal_to( - group, - ByteViewGroupValueBuilder::::new(), - &function_name, - rows, - input, - "0.75 true", - { - let mut rng = seedable_rng(); - let d = Bernoulli::new(0.75).unwrap(); - (0..size).map(|_| d.sample(&mut rng)).collect::>() - }, - ); - vectorized_equal_to( - group, - ByteViewGroupValueBuilder::::new(), - &function_name, - rows, - input, - "0.5 true", - { - let mut rng = seedable_rng(); - let d = Bernoulli::new(0.5).unwrap(); - (0..size).map(|_| d.sample(&mut rng)).collect::>() - }, - ); - vectorized_equal_to( - group, - ByteViewGroupValueBuilder::::new(), - &function_name, - rows, - input, - "0.25 true", - { - let mut rng = seedable_rng(); - let d = Bernoulli::new(0.25).unwrap(); - (0..size).map(|_| d.sample(&mut rng)).collect::>() - }, - ); - // Not adding 0 true case here as if we optimize for 0 true cases the caller should avoid calling this method at all -} - -fn primitive_vectorized_append(c: &mut Criterion) { - let mut group = c.benchmark_group("PrimitiveGroupValueBuilder_vectorized_append"); - - for &size in &SIZES { - let rows: Vec = (0..size).collect(); - - for &null_density in &NULL_DENSITIES { - if null_density == 0.0 { - bench_single_primitive::(&mut group, size, &rows, null_density) - } - bench_single_primitive::(&mut group, size, &rows, null_density); - } - } - - group.finish(); -} - -fn bench_single_primitive( - group: &mut BenchmarkGroup, - size: usize, - rows: &Vec, - null_density: f32, -) { - if !NULLABLE { - assert_eq!( - null_density, 0.0, - "non-nullable case must have null_density 0" - ); - } - - let input = create_primitive_array::(size, null_density); - let input: ArrayRef = Arc::new(input); - let function_name = format!("null_{null_density:.1}_nullable_{NULLABLE}_size_{size}"); - - // vectorized_append - let id = BenchmarkId::new(&function_name, "vectorized_append"); - group.bench_function(id, |b| { - b.iter(|| { - let mut builder = - PrimitiveGroupValueBuilder::::new(DataType::Int32); - builder.vectorized_append(&input, rows).unwrap(); - }); - }); - - // append_val - let id = BenchmarkId::new(&function_name, "append_val"); - group.bench_function(id, |b| { - b.iter(|| { - let mut builder = - PrimitiveGroupValueBuilder::::new(DataType::Int32); - for &i in rows { - builder.append_val(&input, i).unwrap(); - } - }); - }); - - // vectorized_equal_to - vectorized_equal_to( - group, - PrimitiveGroupValueBuilder::::new(DataType::Int32), - &function_name, - rows, - &input, - "all_true", - vec![true; size], - ); - vectorized_equal_to( - group, - PrimitiveGroupValueBuilder::::new(DataType::Int32), - &function_name, - rows, - &input, - "0.75 true", - { - let mut rng = seedable_rng(); - let d = Bernoulli::new(0.75).unwrap(); - (0..size).map(|_| d.sample(&mut rng)).collect::>() - }, - ); - vectorized_equal_to( - group, - PrimitiveGroupValueBuilder::::new(DataType::Int32), - &function_name, - rows, - &input, - "0.5 true", - { - let mut rng = seedable_rng(); - let d = Bernoulli::new(0.5).unwrap(); - (0..size).map(|_| d.sample(&mut rng)).collect::>() - }, - ); - vectorized_equal_to( - group, - PrimitiveGroupValueBuilder::::new(DataType::Int32), - &function_name, - rows, - &input, - "0.25 true", - { - let mut rng = seedable_rng(); - let d = Bernoulli::new(0.25).unwrap(); - (0..size).map(|_| d.sample(&mut rng)).collect::>() - }, - ); - // Not adding 0 true case here as if we optimize for 0 true cases the caller should avoid calling this method at all -} - -/// Test `vectorized_equal_to` with different number of true in the initial results -#[expect(clippy::needless_pass_by_value)] -fn vectorized_equal_to( - group: &mut BenchmarkGroup, - mut builder: GroupColumnBuilder, - function_name: &str, - rows: &[usize], - input: &ArrayRef, - equal_to_result_description: &str, - equal_to_results: Vec, -) { - let id = BenchmarkId::new( - function_name, - format!("vectorized_equal_to_{equal_to_result_description}"), - ); - group.bench_function(id, |b| { - builder.vectorized_append(input, rows).unwrap(); - - b.iter(|| { - // Cloning is a must as `vectorized_equal_to` will modify the input vec - // and without cloning all benchmarks after the first one won't be meaningful - let mut equal_to_results = equal_to_results.clone(); - builder.vectorized_equal_to(rows, input, rows, &mut equal_to_results); - - // Make sure that the compiler does not optimize away the call - black_box(equal_to_results); - }); - }); -} - -criterion_group!(benches, bench_repartition); -criterion_main!(benches); diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index f01f4eacd7313..a590b539b6d4d 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -201,7 +201,7 @@ pub fn new_group_values( } } - if schema.fields.len() <= 4 && multi_group_by::supported_schema(schema.as_ref()) { + if multi_group_by::supported_schema(schema.as_ref()) { if matches!(group_ordering, GroupOrdering::None) { Ok(Box::new(GroupValuesColumn::::try_new(schema, ctx)?)) } else { diff --git a/datafusion/physical-plan/src/aggregates/group_values/row.rs b/datafusion/physical-plan/src/aggregates/group_values/row.rs index bfc262c313ea7..dd794c957350d 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/row.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/row.rs @@ -22,10 +22,10 @@ use arrow::compute::cast; use arrow::datatypes::{DataType, SchemaRef}; use arrow::row::{RowConverter, Rows, SortField}; use datafusion_common::Result; -use datafusion_common::hash_utils::{create_hashes, create_hashes_rows}; +use datafusion_common::hash_utils::create_hashes; use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt}; use datafusion_expr::EmitTo; -use hashbrown::hash_table::{Entry, HashTable}; +use hashbrown::hash_table::HashTable; use log::debug; use std::mem::size_of; use std::sync::Arc;