From 3ab9486f9446cbe131c9203c9336651eecd1bd1e Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 25 Dec 2025 11:33:45 +0200 Subject: [PATCH 1/4] metrics: added more metrics for multi-column group by --- .../src/aggregates/group_values/mod.rs | 8 +- .../group_values/multi_group_by/mod.rs | 142 +++++++++++++++--- .../physical-plan/src/aggregates/row_hash.rs | 2 +- .../physical-plan/src/recursive_query.rs | 17 ++- 4 files changed, 142 insertions(+), 27 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index 2f3b1a19e7d73..a0e977e97fb69 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -25,7 +25,7 @@ use arrow::array::types::{ use arrow::array::{ArrayRef, downcast_primitive}; use arrow::datatypes::{DataType, SchemaRef, TimeUnit}; use datafusion_common::Result; - +use datafusion_execution::metrics::ExecutionPlanMetricsSet; use datafusion_expr::EmitTo; pub mod multi_group_by; @@ -134,6 +134,8 @@ pub trait GroupValues: Send { pub fn new_group_values( schema: SchemaRef, group_ordering: &GroupOrdering, + metrics: &ExecutionPlanMetricsSet, + partition: usize, ) -> Result> { if schema.fields.len() == 1 { let d = schema.fields[0].data_type(); @@ -202,9 +204,9 @@ pub fn new_group_values( if multi_group_by::supported_schema(schema.as_ref()) { if matches!(group_ordering, GroupOrdering::None) { - Ok(Box::new(GroupValuesColumn::::try_new(schema)?)) + Ok(Box::new(GroupValuesColumn::::try_new(schema, metrics, partition)?)) } else { - Ok(Box::new(GroupValuesColumn::::try_new(schema)?)) + Ok(Box::new(GroupValuesColumn::::try_new(schema, metrics, partition)?)) } } else { Ok(Box::new(GroupValuesRows::try_new(schema)?)) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index 479bff001e3c8..1c58fabf2a19a 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -47,10 +47,61 @@ use datafusion_expr::EmitTo; use datafusion_physical_expr::binary_map::OutputType; use hashbrown::hash_table::HashTable; +use datafusion_execution::metrics::{Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, Time}; const NON_INLINED_FLAG: u64 = 0x8000000000000000; const VALUE_MASK: u64 = 0x7FFFFFFFFFFFFFFF; + +pub(crate) struct MultiColumnGroupByMetrics { + /// Time spent hashing the grouping columns + pub(crate) time_hashing_grouping_columns: Time, + + /// Time spent building the hash table for grouping columns + pub(crate) build_hash_table_time: Time, + + /// Track the maximum amount of entries that map held + /// + /// Very large value will probably indicate + pub(crate) maximum_number_of_entries_in_map: Gauge, + + /// This count is + pub(crate) number_of_rows_matching_unique_groups: Count, + + pub(crate) number_of_rows_matching_existing_groups: Count, + + pub(crate) number_of_hash_collisions: Count, + + pub(crate) vectorized_append_time: Time, + pub(crate) vectorized_equal_to_time: Time, + pub(crate) scalarized_intern_remaining_time: Time, +} + +impl MultiColumnGroupByMetrics { + pub(crate) fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { + Self { + time_hashing_grouping_columns: MetricBuilder::new(metrics) + .subset_time("time_hashing_grouping_columns", partition), + build_hash_table_time: MetricBuilder::new(metrics) + .subset_time("build_hash_table_time", partition), + maximum_number_of_entries_in_map: MetricBuilder::new(metrics) + .gauge("maximum_number_of_entries_in_map", partition), + number_of_rows_matching_unique_groups: MetricBuilder::new(metrics) + .counter("number_of_rows_matching_unique_groups", partition), + number_of_rows_matching_existing_groups: MetricBuilder::new(metrics) + .counter("number_of_rows_matching_existing_groups", partition), + number_of_hash_collisions: MetricBuilder::new(metrics) + .counter("number_of_hash_collisions", partition), + vectorized_append_time: MetricBuilder::new(metrics) + .subset_time("vectorized_append_time", partition), + vectorized_equal_to_time: MetricBuilder::new(metrics) + .subset_time("vectorized_equal_to_time", partition), + scalarized_intern_remaining_time: MetricBuilder::new(metrics) + .subset_time("scalarized_intern_remaining_time", partition), + } + } +} + /// Trait for storing a single column of group values in [`GroupValuesColumn`] /// /// Implementations of this trait store an in-progress collection of group values @@ -220,6 +271,8 @@ pub struct GroupValuesColumn { /// Random state for creating hashes random_state: RandomState, + + metrics: MultiColumnGroupByMetrics, } /// Buffers to store intermediate results in `vectorized_append` @@ -254,13 +307,22 @@ impl VectorizedOperationBuffers { } } +struct CollectVectorizedContextResult { + /// How many new map entries + new_map_entries: usize, + + /// How many hashes map to existing entries. + /// this can either point to a hash collision or matching group value + existing_map_entries: usize, +} + impl GroupValuesColumn { // ======================================================================== // Initialization functions // ======================================================================== /// Create a new instance of GroupValuesColumn if supported for the specified schema - pub fn try_new(schema: SchemaRef) -> Result { + pub fn try_new(schema: SchemaRef, metrics: &ExecutionPlanMetricsSet, partition: usize) -> Result { let map = HashTable::with_capacity(0); Ok(Self { schema, @@ -272,6 +334,7 @@ impl GroupValuesColumn { group_values: vec![], hashes_buffer: Default::default(), random_state: crate::aggregates::AGGREGATION_HASH_SEED, + metrics: MultiColumnGroupByMetrics::new(metrics, partition), }) } @@ -430,10 +493,18 @@ impl GroupValuesColumn { groups.clear(); groups.resize(n_rows, usize::MAX); - let mut batch_hashes = mem::take(&mut self.hashes_buffer); - batch_hashes.clear(); - batch_hashes.resize(n_rows, 0); - create_hashes(cols, &self.random_state, &mut batch_hashes)?; + let batch_hashes = { + let _timer = self + .metrics + .time_hashing_grouping_columns + .timer(); + let mut batch_hashes = mem::take(&mut self.hashes_buffer); + batch_hashes.clear(); + batch_hashes.resize(n_rows, 0); + create_hashes(cols, &self.random_state, &mut batch_hashes)?; + + batch_hashes + }; // General steps for one round `vectorized equal_to & append`: // 1. Collect vectorized context by checking hash values of `cols` in `map`, @@ -456,17 +527,38 @@ impl GroupValuesColumn { // // 1. Collect vectorized context by checking hash values of `cols` in `map` - self.collect_vectorized_process_context(&batch_hashes, groups); + let result = self.collect_vectorized_process_context(&batch_hashes, groups); // 2. Perform `vectorized_append` - self.vectorized_append(cols)?; + { + let _timer = self.metrics.vectorized_append_time.timer(); + self.vectorized_append(cols)?; + } // 3. Perform `vectorized_equal_to` - self.vectorized_equal_to(cols, groups); + { + let _timer = self.metrics.vectorized_equal_to_time.timer(); + self.vectorized_equal_to(cols, groups); + } - // 4. Perform scalarized inter for remaining rows - // (about remaining rows, can see comments for `remaining_row_indices`) - self.scalarized_intern_remaining(cols, &batch_hashes, groups)?; + { + let number_of_hash_collisions = self.vectorized_operation_buffers + .remaining_row_indices.len(); + + assert!(result.existing_map_entries >= number_of_hash_collisions, "{} must be larger than the number of hash collisions {}", result.existing_map_entries, number_of_hash_collisions); + + let matching_group_values = result.existing_map_entries - number_of_hash_collisions; + self.metrics.number_of_rows_matching_unique_groups.add(result.new_map_entries); + self.metrics.number_of_rows_matching_existing_groups.add(matching_group_values); + self.metrics.number_of_hash_collisions.add(number_of_hash_collisions); + } + + { + let _timer = self.metrics.scalarized_intern_remaining_time.timer(); + // 4. Perform scalarized inter for remaining rows + // (about remaining rows, can see comments for `remaining_row_indices`) + self.scalarized_intern_remaining(cols, &batch_hashes, groups)?; + } self.hashes_buffer = batch_hashes; @@ -490,7 +582,7 @@ impl GroupValuesColumn { &mut self, batch_hashes: &[u64], groups: &mut [usize], - ) { + ) -> CollectVectorizedContextResult { self.vectorized_operation_buffers.append_row_indices.clear(); self.vectorized_operation_buffers .equal_to_row_indices @@ -499,7 +591,11 @@ impl GroupValuesColumn { .equal_to_group_indices .clear(); - let mut group_values_len = self.group_values[0].len(); + let build_hash_table_timer = self.metrics.build_hash_table_time.timer(); + + let initial_group_values_len = self.group_values[0].len(); + let mut group_values_len = initial_group_values_len; + for (row, &target_hash) in batch_hashes.iter().enumerate() { let entry = self .map @@ -557,6 +653,17 @@ impl GroupValuesColumn { .push(group_index); } } + build_hash_table_timer.done(); + + self.metrics.maximum_number_of_entries_in_map.set_max(self.map.len()); + + let new_groups_count = self.group_values[0].len() - initial_group_values_len; + let existing_groups_count = batch_hashes.len() - new_groups_count; + + CollectVectorizedContextResult { + new_map_entries: new_groups_count, + existing_map_entries: existing_groups_count + } } /// Perform `vectorized_append`` for `rows` in `vectorized_append_row_indices` @@ -679,7 +786,7 @@ impl GroupValuesColumn { /// It is possible that some `input rows` have the same /// hash values with the `exist rows`, but have the different - /// actual values the exists. + /// actual values the exists (hash collision). /// /// We can found them in `vectorized_equal_to`, and put them /// into `scalarized_indices`. And for these `input rows`, @@ -1256,6 +1363,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::{compute::concat_batches, util::pretty::pretty_format_batches}; use datafusion_common::utils::proxy::HashTableAllocExt; + use datafusion_execution::metrics::ExecutionPlanMetricsSet; use datafusion_expr::EmitTo; use crate::aggregates::group_values::{ @@ -1268,7 +1376,7 @@ mod tests { fn test_intern_for_vectorized_group_values() { let data_set = VectorizedTestDataSet::new(); let mut group_values = - GroupValuesColumn::::try_new(data_set.schema()).unwrap(); + GroupValuesColumn::::try_new(data_set.schema(), &ExecutionPlanMetricsSet::new(), 0).unwrap(); data_set.load_to_group_values(&mut group_values); let actual_batch = group_values.emit(EmitTo::All).unwrap(); @@ -1281,7 +1389,7 @@ mod tests { fn test_emit_first_n_for_vectorized_group_values() { let data_set = VectorizedTestDataSet::new(); let mut group_values = - GroupValuesColumn::::try_new(data_set.schema()).unwrap(); + GroupValuesColumn::::try_new(data_set.schema(), &ExecutionPlanMetricsSet::new(), 0).unwrap(); // 1~num_rows times to emit the groups let num_rows = data_set.expected_batch.num_rows(); @@ -1332,7 +1440,7 @@ mod tests { let field = Field::new_list_field(DataType::Int32, true); let schema = Arc::new(Schema::new_with_metadata(vec![field], HashMap::new())); - let mut group_values = GroupValuesColumn::::try_new(schema).unwrap(); + let mut group_values = GroupValuesColumn::::try_new(schema, &ExecutionPlanMetricsSet::new(), 0).unwrap(); // Insert group index views and check if success to insert insert_inline_group_index_view(&mut group_values, 0, 0); diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index cb22fbf9a06a1..3ad5394146a00 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -590,7 +590,7 @@ impl GroupedHashAggregateStream { _ => OutOfMemoryMode::ReportError, }; - let group_values = new_group_values(group_schema, &group_ordering)?; + let group_values = new_group_values(group_schema, &group_ordering, &agg.metrics, partition)?; let reservation = MemoryConsumer::new(name) // We interpret 'can spill' as 'can handle memory back pressure'. // This value needs to be set to true for the default memory pool implementations diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 3e7c75b0c8e85..c2f8b03cf29ac 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -194,14 +194,14 @@ impl ExecutionPlan for RecursiveQueryExec { } let static_stream = self.static_term.execute(partition, Arc::clone(&context))?; - let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); Ok(Box::pin(RecursiveQueryStream::new( context, Arc::clone(&self.work_table), Arc::clone(&self.recursive_term), static_stream, self.is_distinct, - baseline_metrics, + &self.metrics, + partition )?)) } @@ -287,14 +287,17 @@ impl RecursiveQueryStream { recursive_term: Arc, static_stream: SendableRecordBatchStream, is_distinct: bool, - baseline_metrics: BaselineMetrics, + metrics: &ExecutionPlanMetricsSet, + partition: usize, ) -> Result { let schema = static_stream.schema(); let reservation = MemoryConsumer::new("RecursiveQuery").register(task_context.memory_pool()); let distinct_deduplicator = is_distinct - .then(|| DistinctDeduplicator::new(Arc::clone(&schema), &task_context)) + .then(|| DistinctDeduplicator::new(Arc::clone(&schema), &task_context, metrics, partition)) .transpose()?; + let baseline_metrics = BaselineMetrics::new(metrics, partition); + Ok(Self { task_context, work_table, @@ -455,8 +458,10 @@ struct DistinctDeduplicator { } impl DistinctDeduplicator { - fn new(schema: SchemaRef, task_context: &TaskContext) -> Result { - let group_values = new_group_values(schema, &GroupOrdering::None)?; + fn new(schema: SchemaRef, task_context: &TaskContext, + metrics: &ExecutionPlanMetricsSet, + partition: usize,) -> Result { + let group_values = new_group_values(schema, &GroupOrdering::None, metrics, partition)?; let reservation = MemoryConsumer::new("RecursiveQueryHashTable") .register(task_context.memory_pool()); Ok(Self { From bffafce00eec72ca2f9cbd9a3fffd490eb9798ab Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 25 Dec 2025 12:06:07 +0200 Subject: [PATCH 2/4] fix borrow checker --- .../src/aggregates/group_values/multi_group_by/mod.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index 1c58fabf2a19a..191f0438955df 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -531,13 +531,15 @@ impl GroupValuesColumn { // 2. Perform `vectorized_append` { - let _timer = self.metrics.vectorized_append_time.timer(); + let time = self.metrics.vectorized_append_time.clone(); + let _timer = time.timer(); self.vectorized_append(cols)?; } // 3. Perform `vectorized_equal_to` { - let _timer = self.metrics.vectorized_equal_to_time.timer(); + let time = self.metrics.vectorized_equal_to_time.clone(); + let _timer = time.timer(); self.vectorized_equal_to(cols, groups); } @@ -554,7 +556,8 @@ impl GroupValuesColumn { } { - let _timer = self.metrics.scalarized_intern_remaining_time.timer(); + let time = self.metrics.scalarized_intern_remaining_time.clone(); + let _timer = time.timer(); // 4. Perform scalarized inter for remaining rows // (about remaining rows, can see comments for `remaining_row_indices`) self.scalarized_intern_remaining(cols, &batch_hashes, groups)?; From 7cb86a75416b53e67f4d12ae2455e916adea7dd5 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 28 Dec 2025 17:26:55 +0200 Subject: [PATCH 3/4] add timers --- .../group_values/multi_group_by/mod.rs | 113 +++++------------- 1 file changed, 32 insertions(+), 81 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index 191f0438955df..f8782705986a1 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -52,29 +52,21 @@ use datafusion_execution::metrics::{Count, ExecutionPlanMetricsSet, Gauge, Metri const NON_INLINED_FLAG: u64 = 0x8000000000000000; const VALUE_MASK: u64 = 0x7FFFFFFFFFFFFFFF; - pub(crate) struct MultiColumnGroupByMetrics { /// Time spent hashing the grouping columns pub(crate) time_hashing_grouping_columns: Time, - /// Time spent building the hash table for grouping columns + /// Time spent building and appending the hash table for grouping columns pub(crate) build_hash_table_time: Time, - /// Track the maximum amount of entries that map held + /// Track the maximum number of entries that map held /// - /// Very large value will probably indicate + /// Very large value will probably indicate problems with fetching from the hash table pub(crate) maximum_number_of_entries_in_map: Gauge, - /// This count is - pub(crate) number_of_rows_matching_unique_groups: Count, - - pub(crate) number_of_rows_matching_existing_groups: Count, + /// Maximum hash map capacity + pub(crate) maximum_hash_map_capacity: Gauge, - pub(crate) number_of_hash_collisions: Count, - - pub(crate) vectorized_append_time: Time, - pub(crate) vectorized_equal_to_time: Time, - pub(crate) scalarized_intern_remaining_time: Time, } impl MultiColumnGroupByMetrics { @@ -85,19 +77,9 @@ impl MultiColumnGroupByMetrics { build_hash_table_time: MetricBuilder::new(metrics) .subset_time("build_hash_table_time", partition), maximum_number_of_entries_in_map: MetricBuilder::new(metrics) - .gauge("maximum_number_of_entries_in_map", partition), - number_of_rows_matching_unique_groups: MetricBuilder::new(metrics) - .counter("number_of_rows_matching_unique_groups", partition), - number_of_rows_matching_existing_groups: MetricBuilder::new(metrics) - .counter("number_of_rows_matching_existing_groups", partition), - number_of_hash_collisions: MetricBuilder::new(metrics) - .counter("number_of_hash_collisions", partition), - vectorized_append_time: MetricBuilder::new(metrics) - .subset_time("vectorized_append_time", partition), - vectorized_equal_to_time: MetricBuilder::new(metrics) - .subset_time("vectorized_equal_to_time", partition), - scalarized_intern_remaining_time: MetricBuilder::new(metrics) - .subset_time("scalarized_intern_remaining_time", partition), + .gauge("maximum_number_of_entries_in_map", partition), + maximum_hash_map_capacity: MetricBuilder::new(metrics) + .gauge("maximum_hash_map_capacity", partition), } } } @@ -272,6 +254,7 @@ pub struct GroupValuesColumn { /// Random state for creating hashes random_state: RandomState, + /// Metrics for this group values column metrics: MultiColumnGroupByMetrics, } @@ -307,15 +290,6 @@ impl VectorizedOperationBuffers { } } -struct CollectVectorizedContextResult { - /// How many new map entries - new_map_entries: usize, - - /// How many hashes map to existing entries. - /// this can either point to a hash collision or matching group value - existing_map_entries: usize, -} - impl GroupValuesColumn { // ======================================================================== // Initialization functions @@ -395,7 +369,12 @@ impl GroupValuesColumn { let batch_hashes = &mut self.hashes_buffer; batch_hashes.clear(); batch_hashes.resize(n_rows, 0); - create_hashes(cols, &self.random_state, batch_hashes)?; + { + let _timer = self.metrics.time_hashing_grouping_columns.timer(); + create_hashes(cols, &self.random_state, batch_hashes)?; + } + + let build_hash_map_timer = self.metrics.build_hash_table_time.timer(); for (row, &target_hash) in batch_hashes.iter().enumerate() { let entry = self @@ -468,6 +447,11 @@ impl GroupValuesColumn { groups.push(group_idx); } + build_hash_map_timer.done(); + + self.metrics.maximum_hash_map_capacity.set_max(self.map.capacity()); + self.metrics.maximum_number_of_entries_in_map.set_max(self.map.len()); + Ok(()) } @@ -493,17 +477,15 @@ impl GroupValuesColumn { groups.clear(); groups.resize(n_rows, usize::MAX); - let batch_hashes = { + let mut batch_hashes = mem::take(&mut self.hashes_buffer); + batch_hashes.clear(); + batch_hashes.resize(n_rows, 0); + { let _timer = self .metrics .time_hashing_grouping_columns .timer(); - let mut batch_hashes = mem::take(&mut self.hashes_buffer); - batch_hashes.clear(); - batch_hashes.resize(n_rows, 0); create_hashes(cols, &self.random_state, &mut batch_hashes)?; - - batch_hashes }; // General steps for one round `vectorized equal_to & append`: @@ -527,41 +509,17 @@ impl GroupValuesColumn { // // 1. Collect vectorized context by checking hash values of `cols` in `map` - let result = self.collect_vectorized_process_context(&batch_hashes, groups); + self.collect_vectorized_process_context(&batch_hashes, groups); // 2. Perform `vectorized_append` - { - let time = self.metrics.vectorized_append_time.clone(); - let _timer = time.timer(); - self.vectorized_append(cols)?; - } + self.vectorized_append(cols)?; // 3. Perform `vectorized_equal_to` - { - let time = self.metrics.vectorized_equal_to_time.clone(); - let _timer = time.timer(); - self.vectorized_equal_to(cols, groups); - } + self.vectorized_equal_to(cols, groups); - { - let number_of_hash_collisions = self.vectorized_operation_buffers - .remaining_row_indices.len(); - - assert!(result.existing_map_entries >= number_of_hash_collisions, "{} must be larger than the number of hash collisions {}", result.existing_map_entries, number_of_hash_collisions); - - let matching_group_values = result.existing_map_entries - number_of_hash_collisions; - self.metrics.number_of_rows_matching_unique_groups.add(result.new_map_entries); - self.metrics.number_of_rows_matching_existing_groups.add(matching_group_values); - self.metrics.number_of_hash_collisions.add(number_of_hash_collisions); - } - - { - let time = self.metrics.scalarized_intern_remaining_time.clone(); - let _timer = time.timer(); - // 4. Perform scalarized inter for remaining rows - // (about remaining rows, can see comments for `remaining_row_indices`) - self.scalarized_intern_remaining(cols, &batch_hashes, groups)?; - } + // 4. Perform scalarized inter for remaining rows + // (about remaining rows, can see comments for `remaining_row_indices`) + self.scalarized_intern_remaining(cols, &batch_hashes, groups)?; self.hashes_buffer = batch_hashes; @@ -585,7 +543,7 @@ impl GroupValuesColumn { &mut self, batch_hashes: &[u64], groups: &mut [usize], - ) -> CollectVectorizedContextResult { + ) { self.vectorized_operation_buffers.append_row_indices.clear(); self.vectorized_operation_buffers .equal_to_row_indices @@ -659,14 +617,7 @@ impl GroupValuesColumn { build_hash_table_timer.done(); self.metrics.maximum_number_of_entries_in_map.set_max(self.map.len()); - - let new_groups_count = self.group_values[0].len() - initial_group_values_len; - let existing_groups_count = batch_hashes.len() - new_groups_count; - - CollectVectorizedContextResult { - new_map_entries: new_groups_count, - existing_map_entries: existing_groups_count - } + self.metrics.maximum_hash_map_capacity.set_max(self.map.capacity()); } /// Perform `vectorized_append`` for `rows` in `vectorized_append_row_indices` From ed3c6e0ca394dd68ff7668513d2f767c4d51090f Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 28 Dec 2025 17:33:33 +0200 Subject: [PATCH 4/4] format and lint --- .../src/aggregates/group_values/mod.rs | 8 ++- .../group_values/multi_group_by/mod.rs | 59 +++++++++++++------ .../physical-plan/src/aggregates/row_hash.rs | 3 +- .../physical-plan/src/recursive_query.rs | 23 ++++++-- 4 files changed, 66 insertions(+), 27 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index a0e977e97fb69..9ce87b1a653b5 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -204,9 +204,13 @@ pub fn new_group_values( if multi_group_by::supported_schema(schema.as_ref()) { if matches!(group_ordering, GroupOrdering::None) { - Ok(Box::new(GroupValuesColumn::::try_new(schema, metrics, partition)?)) + Ok(Box::new(GroupValuesColumn::::try_new( + schema, metrics, partition, + )?)) } else { - Ok(Box::new(GroupValuesColumn::::try_new(schema, metrics, partition)?)) + Ok(Box::new(GroupValuesColumn::::try_new( + schema, metrics, partition, + )?)) } } else { Ok(Box::new(GroupValuesRows::try_new(schema)?)) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index f8782705986a1..03197fd92d8c4 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -46,8 +46,10 @@ use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt}; use datafusion_expr::EmitTo; use datafusion_physical_expr::binary_map::OutputType; +use datafusion_execution::metrics::{ + ExecutionPlanMetricsSet, Gauge, MetricBuilder, Time, +}; use hashbrown::hash_table::HashTable; -use datafusion_execution::metrics::{Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, Time}; const NON_INLINED_FLAG: u64 = 0x8000000000000000; const VALUE_MASK: u64 = 0x7FFFFFFFFFFFFFFF; @@ -66,7 +68,6 @@ pub(crate) struct MultiColumnGroupByMetrics { /// Maximum hash map capacity pub(crate) maximum_hash_map_capacity: Gauge, - } impl MultiColumnGroupByMetrics { @@ -77,9 +78,9 @@ impl MultiColumnGroupByMetrics { build_hash_table_time: MetricBuilder::new(metrics) .subset_time("build_hash_table_time", partition), maximum_number_of_entries_in_map: MetricBuilder::new(metrics) - .gauge("maximum_number_of_entries_in_map", partition), + .gauge("maximum_number_of_entries_in_map", partition), maximum_hash_map_capacity: MetricBuilder::new(metrics) - .gauge("maximum_hash_map_capacity", partition), + .gauge("maximum_hash_map_capacity", partition), } } } @@ -296,7 +297,11 @@ impl GroupValuesColumn { // ======================================================================== /// Create a new instance of GroupValuesColumn if supported for the specified schema - pub fn try_new(schema: SchemaRef, metrics: &ExecutionPlanMetricsSet, partition: usize) -> Result { + pub fn try_new( + schema: SchemaRef, + metrics: &ExecutionPlanMetricsSet, + partition: usize, + ) -> Result { let map = HashTable::with_capacity(0); Ok(Self { schema, @@ -449,8 +454,12 @@ impl GroupValuesColumn { build_hash_map_timer.done(); - self.metrics.maximum_hash_map_capacity.set_max(self.map.capacity()); - self.metrics.maximum_number_of_entries_in_map.set_max(self.map.len()); + self.metrics + .maximum_hash_map_capacity + .set_max(self.map.capacity()); + self.metrics + .maximum_number_of_entries_in_map + .set_max(self.map.len()); Ok(()) } @@ -481,10 +490,7 @@ impl GroupValuesColumn { batch_hashes.clear(); batch_hashes.resize(n_rows, 0); { - let _timer = self - .metrics - .time_hashing_grouping_columns - .timer(); + let _timer = self.metrics.time_hashing_grouping_columns.timer(); create_hashes(cols, &self.random_state, &mut batch_hashes)?; }; @@ -616,8 +622,12 @@ impl GroupValuesColumn { } build_hash_table_timer.done(); - self.metrics.maximum_number_of_entries_in_map.set_max(self.map.len()); - self.metrics.maximum_hash_map_capacity.set_max(self.map.capacity()); + self.metrics + .maximum_number_of_entries_in_map + .set_max(self.map.len()); + self.metrics + .maximum_hash_map_capacity + .set_max(self.map.capacity()); } /// Perform `vectorized_append`` for `rows` in `vectorized_append_row_indices` @@ -1329,8 +1339,12 @@ mod tests { #[test] fn test_intern_for_vectorized_group_values() { let data_set = VectorizedTestDataSet::new(); - let mut group_values = - GroupValuesColumn::::try_new(data_set.schema(), &ExecutionPlanMetricsSet::new(), 0).unwrap(); + let mut group_values = GroupValuesColumn::::try_new( + data_set.schema(), + &ExecutionPlanMetricsSet::new(), + 0, + ) + .unwrap(); data_set.load_to_group_values(&mut group_values); let actual_batch = group_values.emit(EmitTo::All).unwrap(); @@ -1342,8 +1356,12 @@ mod tests { #[test] fn test_emit_first_n_for_vectorized_group_values() { let data_set = VectorizedTestDataSet::new(); - let mut group_values = - GroupValuesColumn::::try_new(data_set.schema(), &ExecutionPlanMetricsSet::new(), 0).unwrap(); + let mut group_values = GroupValuesColumn::::try_new( + data_set.schema(), + &ExecutionPlanMetricsSet::new(), + 0, + ) + .unwrap(); // 1~num_rows times to emit the groups let num_rows = data_set.expected_batch.num_rows(); @@ -1394,7 +1412,12 @@ mod tests { let field = Field::new_list_field(DataType::Int32, true); let schema = Arc::new(Schema::new_with_metadata(vec![field], HashMap::new())); - let mut group_values = GroupValuesColumn::::try_new(schema, &ExecutionPlanMetricsSet::new(), 0).unwrap(); + let mut group_values = GroupValuesColumn::::try_new( + schema, + &ExecutionPlanMetricsSet::new(), + 0, + ) + .unwrap(); // Insert group index views and check if success to insert insert_inline_group_index_view(&mut group_values, 0, 0); diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 3ad5394146a00..265f0ff3306b7 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -590,7 +590,8 @@ impl GroupedHashAggregateStream { _ => OutOfMemoryMode::ReportError, }; - let group_values = new_group_values(group_schema, &group_ordering, &agg.metrics, partition)?; + let group_values = + new_group_values(group_schema, &group_ordering, &agg.metrics, partition)?; let reservation = MemoryConsumer::new(name) // We interpret 'can spill' as 'can handle memory back pressure'. // This value needs to be set to true for the default memory pool implementations diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index c2f8b03cf29ac..aa4007f67f6b6 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -201,7 +201,7 @@ impl ExecutionPlan for RecursiveQueryExec { static_stream, self.is_distinct, &self.metrics, - partition + partition, )?)) } @@ -294,7 +294,14 @@ impl RecursiveQueryStream { let reservation = MemoryConsumer::new("RecursiveQuery").register(task_context.memory_pool()); let distinct_deduplicator = is_distinct - .then(|| DistinctDeduplicator::new(Arc::clone(&schema), &task_context, metrics, partition)) + .then(|| { + DistinctDeduplicator::new( + Arc::clone(&schema), + &task_context, + metrics, + partition, + ) + }) .transpose()?; let baseline_metrics = BaselineMetrics::new(metrics, partition); @@ -458,10 +465,14 @@ struct DistinctDeduplicator { } impl DistinctDeduplicator { - fn new(schema: SchemaRef, task_context: &TaskContext, - metrics: &ExecutionPlanMetricsSet, - partition: usize,) -> Result { - let group_values = new_group_values(schema, &GroupOrdering::None, metrics, partition)?; + fn new( + schema: SchemaRef, + task_context: &TaskContext, + metrics: &ExecutionPlanMetricsSet, + partition: usize, + ) -> Result { + let group_values = + new_group_values(schema, &GroupOrdering::None, metrics, partition)?; let reservation = MemoryConsumer::new("RecursiveQueryHashTable") .register(task_context.memory_pool()); Ok(Self {