diff --git a/Cargo.lock b/Cargo.lock index 22ec582536069..b9b4f0c3cb3cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1232,6 +1232,15 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "branches" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21ab2097ac67710acd7d6034dd11b719c086a440ab65b92aa625be166f5995e9" +dependencies = [ + "rustc_version", +] + [[package]] name = "brotli" version = "8.0.2" @@ -1322,6 +1331,15 @@ dependencies = [ "libbz2-rs-sys", ] +[[package]] +name = "cache-size" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df5c3aca3f278263eecfa3e6d2cd1fdd76029f68a0c9e9ed5ab55f0508e70723" +dependencies = [ + "raw-cpuid", +] + [[package]] name = "cast" version = "0.3.0" @@ -1822,7 +1840,7 @@ checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" dependencies = [ "cfg-if", "crossbeam-utils", - "hashbrown 0.14.5", + "hashbrown 0.14.5 (registry+https://github.com/rust-lang/crates.io-index)", "lock_api", "once_cell", "parking_lot_core", @@ -2013,7 +2031,7 @@ dependencies = [ "chrono", "criterion", "half", - "hashbrown 0.14.5", + "hashbrown 0.14.5 (git+https://github.com/rluvaton/hashbrown?rev=dbdac43f537223a469746f518929b542f3b8933d)", "hex", "indexmap 2.12.1", "insta", @@ -2495,7 +2513,7 @@ dependencies = [ "datafusion-functions-aggregate-common", "datafusion-physical-expr-common", "half", - "hashbrown 0.14.5", + "hashbrown 0.14.5 (git+https://github.com/rluvaton/hashbrown?rev=dbdac43f537223a469746f518929b542f3b8933d)", "indexmap 2.12.1", "insta", "itertools 0.14.0", @@ -2529,7 +2547,7 @@ dependencies = [ "arrow", "datafusion-common", "datafusion-expr-common", - "hashbrown 0.14.5", + "hashbrown 0.14.5 (git+https://github.com/rluvaton/hashbrown?rev=dbdac43f537223a469746f518929b542f3b8933d)", "itertools 0.14.0", ] @@ -2562,6 +2580,7 @@ dependencies = [ "arrow-ord", "arrow-schema", "async-trait", + "cache-size", "criterion", "datafusion-common", "datafusion-common-runtime", @@ -2576,7 +2595,7 @@ dependencies = [ "datafusion-physical-expr-common", "futures", "half", - "hashbrown 0.14.5", + "hashbrown 0.14.5 (git+https://github.com/rluvaton/hashbrown?rev=dbdac43f537223a469746f518929b542f3b8933d)", "indexmap 2.12.1", "insta", "itertools 0.14.0", @@ -3369,9 +3388,15 @@ name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "git+https://github.com/rluvaton/hashbrown?rev=dbdac43f537223a469746f518929b542f3b8933d#dbdac43f537223a469746f518929b542f3b8933d" dependencies = [ "ahash 0.8.12", "allocator-api2", + "branches", ] [[package]] @@ -5106,6 +5131,15 @@ dependencies = [ "rand 0.9.2", ] +[[package]] +name = "raw-cpuid" +version = "11.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" +dependencies = [ + "bitflags 2.9.4", +] + [[package]] name = "rayon" version = "1.11.0" diff --git a/Cargo.toml b/Cargo.toml index 10fc88b7057c8..65ebfed76ad7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -156,7 +156,7 @@ flate2 = "1.1.5" futures = "0.3" glob = "0.3.0" half = { version = "2.7.0", default-features = false } -hashbrown = { version = "0.14.5", features = ["raw"] } +hashbrown = { features = ["raw"], git = "https://github.com/rluvaton/hashbrown", rev = "dbdac43f537223a469746f518929b542f3b8933d" } hex = { version = "0.4.3" } indexmap = "2.12.1" insta = { version = "1.45.0", features = ["glob", "filters"] } diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 2bea2ec5a4526..dc2b2290dbe87 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -649,6 +649,10 @@ config_namespace! { /// # Default /// `false` — ANSI SQL mode is disabled by default. pub enable_ansi_mode: bool, default = false + + pub agg_prefetch_elements: usize, default = 1 + pub agg_prefetch_locality: usize, default = 3 + pub agg_prefetch_read: bool, default = false } } diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 68e67fa018f08..da5c45dcb2de0 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -70,6 +70,7 @@ log = { workspace = true } parking_lot = { workspace = true } pin-project-lite = "^0.2.7" tokio = { workspace = true } +cache-size = "0.7.0" [dev-dependencies] criterion = { workspace = true, features = ["async_futures"] } diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index 2f3b1a19e7d73..a590b539b6d4d 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -25,7 +25,7 @@ use arrow::array::types::{ use arrow::array::{ArrayRef, downcast_primitive}; use arrow::datatypes::{DataType, SchemaRef, TimeUnit}; use datafusion_common::Result; - +use datafusion_execution::TaskContext; use datafusion_expr::EmitTo; pub mod multi_group_by; @@ -134,6 +134,7 @@ pub trait GroupValues: Send { pub fn new_group_values( schema: SchemaRef, group_ordering: &GroupOrdering, + ctx: Option<&TaskContext> ) -> Result> { if schema.fields.len() == 1 { let d = schema.fields[0].data_type(); @@ -202,9 +203,9 @@ pub fn new_group_values( if multi_group_by::supported_schema(schema.as_ref()) { if matches!(group_ordering, GroupOrdering::None) { - Ok(Box::new(GroupValuesColumn::::try_new(schema)?)) + Ok(Box::new(GroupValuesColumn::::try_new(schema, ctx)?)) } else { - Ok(Box::new(GroupValuesColumn::::try_new(schema)?)) + Ok(Box::new(GroupValuesColumn::::try_new(schema, ctx)?)) } } else { Ok(Box::new(GroupValuesRows::try_new(schema)?)) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index 479bff001e3c8..37a1abfb5928c 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -23,14 +23,14 @@ pub mod bytes_view; pub mod primitive; use std::mem::{self, size_of}; - +use std::sync::LazyLock; use crate::aggregates::group_values::GroupValues; use crate::aggregates::group_values::multi_group_by::{ boolean::BooleanGroupValueBuilder, bytes::ByteGroupValueBuilder, bytes_view::ByteViewGroupValueBuilder, primitive::PrimitiveGroupValueBuilder, }; use ahash::RandomState; -use arrow::array::{Array, ArrayRef}; +use arrow::array::{Array, ArrayRef, RecordBatch}; use arrow::compute::cast; use arrow::datatypes::{ BinaryViewType, DataType, Date32Type, Date64Type, Decimal128Type, Float32Type, @@ -47,10 +47,40 @@ use datafusion_expr::EmitTo; use datafusion_physical_expr::binary_map::OutputType; use hashbrown::hash_table::HashTable; +use datafusion_execution::TaskContext; const NON_INLINED_FLAG: u64 = 0x8000000000000000; const VALUE_MASK: u64 = 0x7FFFFFFFFFFFFFFF; +// For mac 65536 +fn l1_cache_size() -> usize { + static SIZE: LazyLock = LazyLock::new(|| cache_size::l1_cache_size().unwrap_or( + // 32KB + 32 * 1024 + )); + + *SIZE +} +// for mac 4194304 +fn l2_cache_size() -> usize { + static SIZE: LazyLock = LazyLock::new(|| cache_size::l2_cache_size().unwrap_or( + // 1MB + 1024 * 1024 + )); + + *SIZE +} + +// No L3 for mac so maybe 4194304 * 100 to have as threshold? +fn l3_cache_size() -> usize { + static SIZE: LazyLock = LazyLock::new(|| cache_size::l3_cache_size().unwrap_or( + // 12MB + 12 * 1024 * 1024 + )); + + *SIZE +} + /// Trait for storing a single column of group values in [`GroupValuesColumn`] /// /// Implementations of this trait store an in-progress collection of group values @@ -220,6 +250,10 @@ pub struct GroupValuesColumn { /// Random state for creating hashes random_state: RandomState, + + agg_prefetch_elements: usize, + agg_prefetch_locality: usize, + agg_prefetch_read: bool, } /// Buffers to store intermediate results in `vectorized_append` @@ -260,7 +294,7 @@ impl GroupValuesColumn { // ======================================================================== /// Create a new instance of GroupValuesColumn if supported for the specified schema - pub fn try_new(schema: SchemaRef) -> Result { + pub fn try_new(schema: SchemaRef, ctx: Option<&TaskContext>) -> Result { let map = HashTable::with_capacity(0); Ok(Self { schema, @@ -272,6 +306,9 @@ impl GroupValuesColumn { group_values: vec![], hashes_buffer: Default::default(), random_state: crate::aggregates::AGGREGATION_HASH_SEED, + agg_prefetch_elements: ctx.map(|c| c.session_config().options().execution.agg_prefetch_elements).unwrap_or(1), + agg_prefetch_locality: ctx.map(|c| c.session_config().options().execution.agg_prefetch_locality).unwrap_or(3), + agg_prefetch_read: ctx.map(|c| c.session_config().options().execution.agg_prefetch_read).unwrap_or(false), }) } @@ -491,74 +528,168 @@ impl GroupValuesColumn { batch_hashes: &[u64], groups: &mut [usize], ) { + // ➜ ~ sysctl hw.l1dcachesize + // L1 cache size: 65536 + // L2 cache size: 4194304 + + let should_prefetch = if self.agg_prefetch_locality == 1 { + self.agg_prefetch_elements > 0 && self.map_size >= l1_cache_size() + } else if self.agg_prefetch_locality == 2 { + self.agg_prefetch_elements > 0 && self.map_size >= l2_cache_size() + } else if self.agg_prefetch_locality == 3 { + self.agg_prefetch_elements > 0 && self.map_size >= l3_cache_size() + } else { + self.agg_prefetch_elements > 0 + }; + + if !should_prefetch { + self.collect_vectorized_process_context_with_prefetch::<0>(batch_hashes, groups); + return; + } + + match self.agg_prefetch_elements { + 0 => self.collect_vectorized_process_context_with_prefetch::<0>(batch_hashes, groups), + 1 => self.collect_vectorized_process_context_with_prefetch::<1>(batch_hashes, groups), + 2 => self.collect_vectorized_process_context_with_prefetch::<2>(batch_hashes, groups), + 3 => self.collect_vectorized_process_context_with_prefetch::<3>(batch_hashes, groups), + 4 => self.collect_vectorized_process_context_with_prefetch::<4>(batch_hashes, groups), + 5 => self.collect_vectorized_process_context_with_prefetch::<5>(batch_hashes, groups), + 6 => self.collect_vectorized_process_context_with_prefetch::<6>(batch_hashes, groups), + 7 => self.collect_vectorized_process_context_with_prefetch::<7>(batch_hashes, groups), + 8 => self.collect_vectorized_process_context_with_prefetch::<8>(batch_hashes, groups), + _ => self.collect_vectorized_process_context_with_prefetch::<8>(batch_hashes, groups), + } + } + + fn collect_vectorized_process_context_with_prefetch( + &mut self, + batch_hashes: &[u64], + groups: &mut [usize], + ) { + if self.agg_prefetch_read { + self.collect_vectorized_process_context_with_prefetch_and_read::(batch_hashes, groups) + } else { + self.collect_vectorized_process_context_with_prefetch_and_read::(batch_hashes, groups) + } + } + + fn collect_vectorized_process_context_with_prefetch_and_read( + &mut self, + batch_hashes: &[u64], + groups: &mut [usize], + ) { + match self.agg_prefetch_locality { + 0 => self.collect_vectorized_process_context_with_prefetch_and_read_and_locality::(batch_hashes, groups), + 1 => self.collect_vectorized_process_context_with_prefetch_and_read_and_locality::(batch_hashes, groups), + 2 => self.collect_vectorized_process_context_with_prefetch_and_read_and_locality::(batch_hashes, groups), + 3 => self.collect_vectorized_process_context_with_prefetch_and_read_and_locality::(batch_hashes, groups), + _ => self.collect_vectorized_process_context_with_prefetch_and_read_and_locality::(batch_hashes, groups), + } + } + fn collect_vectorized_process_context_with_prefetch_and_read_and_locality( + &mut self, + batch_hashes: &[u64], + groups: &mut [usize], + ) { self.vectorized_operation_buffers.append_row_indices.clear(); self.vectorized_operation_buffers - .equal_to_row_indices - .clear(); + .equal_to_row_indices + .clear(); self.vectorized_operation_buffers - .equal_to_group_indices - .clear(); + .equal_to_group_indices + .clear(); + + if PREFETCH > 0 { + self.map.reserve(batch_hashes.len(), |(hash, _)| *hash); + let mut group_values_len = self.group_values[0].len(); + if batch_hashes.is_empty() { + return; + } - let mut group_values_len = self.group_values[0].len(); - for (row, &target_hash) in batch_hashes.iter().enumerate() { - let entry = self - .map - .find(target_hash, |(exist_hash, _)| target_hash == *exist_hash); + let mut batch_hashes_iter = batch_hashes[0..batch_hashes.len().saturating_sub(PREFETCH)].iter().enumerate(); - let Some((_, group_index_view)) = entry else { - // 1. Bucket not found case - // Build `new inlined group index view` - let current_group_idx = group_values_len; - let group_index_view = - GroupIndexView::new_inlined(current_group_idx as u64); - - // Insert the `group index view` and its hash into `map` - // for hasher function, use precomputed hash value - self.map.insert_accounted( - (target_hash, group_index_view), - |(hash, _)| *hash, - &mut self.map_size, - ); - - // Add row index to `vectorized_append_row_indices` - self.vectorized_operation_buffers - .append_row_indices - .push(row); - - // Set group index to row in `groups` - groups[row] = current_group_idx; - - group_values_len += 1; - continue; - }; + for (row, &target_hash) in batch_hashes_iter { + // prefetch next item + for i in 1..=PREFETCH { + if READ { + self.map.prefetch_read::(batch_hashes[row + i]); + } else { + self.map.prefetch_write::(batch_hashes[row + i]); + } + } + self.insert_entry(groups, &mut group_values_len, row, target_hash); + } + for index in batch_hashes.len().saturating_sub(PREFETCH)..batch_hashes.len() { + self.insert_entry(groups, &mut group_values_len, index, batch_hashes[index]); + } + } else { + let mut group_values_len = self.group_values[0].len(); - // 2. bucket found - // Check if the `group index view` is `inlined` or `non_inlined` - if group_index_view.is_non_inlined() { - // Non-inlined case, the value of view is offset in `group_index_lists`. - // We use it to get `group_index_list`, and add related `rows` and `group_indices` - // into `vectorized_equal_to_row_indices` and `vectorized_equal_to_group_indices`. - let list_offset = group_index_view.value() as usize; - let group_index_list = &self.group_index_lists[list_offset]; - - self.vectorized_operation_buffers - .equal_to_group_indices - .extend_from_slice(group_index_list); - self.vectorized_operation_buffers - .equal_to_row_indices - .extend(std::iter::repeat_n(row, group_index_list.len())); - } else { - let group_index = group_index_view.value() as usize; - self.vectorized_operation_buffers - .equal_to_row_indices - .push(row); - self.vectorized_operation_buffers - .equal_to_group_indices - .push(group_index); + for (row, &target_hash) in batch_hashes.iter().enumerate() { + self.insert_entry(groups, &mut group_values_len, row, target_hash); } } } + #[inline(always)] + fn insert_entry(&mut self, groups: &mut [usize], group_values_len: &mut usize, row: usize, target_hash: u64) { + let entry = self + .map + .find(target_hash, |(exist_hash, _)| target_hash == *exist_hash); + + let Some((_, group_index_view)) = entry else { + // 1. Bucket not found case + // Build `new inlined group index view` + let current_group_idx = *group_values_len; + let group_index_view = + GroupIndexView::new_inlined(current_group_idx as u64); + + // Insert the `group index view` and its hash into `map` + // for hasher function, use precomputed hash value + self.map.insert_accounted( + (target_hash, group_index_view), + |(hash, _)| *hash, + &mut self.map_size, + ); + + // Add row index to `vectorized_append_row_indices` + self.vectorized_operation_buffers + .append_row_indices + .push(row); + + // Set group index to row in `groups` + groups[row] = current_group_idx; + + *group_values_len += 1; + return; + }; + + // 2. bucket found + // Check if the `group index view` is `inlined` or `non_inlined` + if group_index_view.is_non_inlined() { + // Non-inlined case, the value of view is offset in `group_index_lists`. + // We use it to get `group_index_list`, and add related `rows` and `group_indices` + // into `vectorized_equal_to_row_indices` and `vectorized_equal_to_group_indices`. + let list_offset = group_index_view.value() as usize; + let group_index_list = &self.group_index_lists[list_offset]; + + self.vectorized_operation_buffers + .equal_to_group_indices + .extend_from_slice(group_index_list); + self.vectorized_operation_buffers + .equal_to_row_indices + .extend(std::iter::repeat_n(row, group_index_list.len())); + } else { + let group_index = group_index_view.value() as usize; + self.vectorized_operation_buffers + .equal_to_row_indices + .push(row); + self.vectorized_operation_buffers + .equal_to_group_indices + .push(group_index); + } + } + /// Perform `vectorized_append`` for `rows` in `vectorized_append_row_indices` fn vectorized_append(&mut self, cols: &[ArrayRef]) -> Result<()> { if self diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index cb22fbf9a06a1..8fb8624c58df0 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -17,6 +17,7 @@ //! Hash aggregation +use std::ops::Deref; use std::sync::Arc; use std::task::{Context, Poll}; use std::vec; @@ -590,7 +591,7 @@ impl GroupedHashAggregateStream { _ => OutOfMemoryMode::ReportError, }; - let group_values = new_group_values(group_schema, &group_ordering)?; + let group_values = new_group_values(group_schema, &group_ordering, Some(context.deref()))?; let reservation = MemoryConsumer::new(name) // We interpret 'can spill' as 'can handle memory back pressure'. // This value needs to be set to true for the default memory pool implementations diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 3e7c75b0c8e85..7531ea0c6ca35 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -456,7 +456,7 @@ struct DistinctDeduplicator { impl DistinctDeduplicator { fn new(schema: SchemaRef, task_context: &TaskContext) -> Result { - let group_values = new_group_values(schema, &GroupOrdering::None)?; + let group_values = new_group_values(schema, &GroupOrdering::None, Some(task_context))?; let reservation = MemoryConsumer::new("RecursiveQueryHashTable") .register(task_context.memory_pool()); Ok(Self {