diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index 2f3b1a19e7d73..9ce87b1a653b5 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -25,7 +25,7 @@ use arrow::array::types::{ use arrow::array::{ArrayRef, downcast_primitive}; use arrow::datatypes::{DataType, SchemaRef, TimeUnit}; use datafusion_common::Result; - +use datafusion_execution::metrics::ExecutionPlanMetricsSet; use datafusion_expr::EmitTo; pub mod multi_group_by; @@ -134,6 +134,8 @@ pub trait GroupValues: Send { pub fn new_group_values( schema: SchemaRef, group_ordering: &GroupOrdering, + metrics: &ExecutionPlanMetricsSet, + partition: usize, ) -> Result> { if schema.fields.len() == 1 { let d = schema.fields[0].data_type(); @@ -202,9 +204,13 @@ pub fn new_group_values( if multi_group_by::supported_schema(schema.as_ref()) { if matches!(group_ordering, GroupOrdering::None) { - Ok(Box::new(GroupValuesColumn::::try_new(schema)?)) + Ok(Box::new(GroupValuesColumn::::try_new( + schema, metrics, partition, + )?)) } else { - Ok(Box::new(GroupValuesColumn::::try_new(schema)?)) + Ok(Box::new(GroupValuesColumn::::try_new( + schema, metrics, partition, + )?)) } } else { Ok(Box::new(GroupValuesRows::try_new(schema)?)) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index 479bff001e3c8..03197fd92d8c4 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -46,11 +46,45 @@ use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt}; use datafusion_expr::EmitTo; use datafusion_physical_expr::binary_map::OutputType; +use datafusion_execution::metrics::{ + ExecutionPlanMetricsSet, Gauge, MetricBuilder, Time, +}; use hashbrown::hash_table::HashTable; const NON_INLINED_FLAG: u64 = 0x8000000000000000; const VALUE_MASK: u64 = 0x7FFFFFFFFFFFFFFF; +pub(crate) struct MultiColumnGroupByMetrics { + /// Time spent hashing the grouping columns + pub(crate) time_hashing_grouping_columns: Time, + + /// Time spent building and appending the hash table for grouping columns + pub(crate) build_hash_table_time: Time, + + /// Track the maximum number of entries that map held + /// + /// Very large value will probably indicate problems with fetching from the hash table + pub(crate) maximum_number_of_entries_in_map: Gauge, + + /// Maximum hash map capacity + pub(crate) maximum_hash_map_capacity: Gauge, +} + +impl MultiColumnGroupByMetrics { + pub(crate) fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { + Self { + time_hashing_grouping_columns: MetricBuilder::new(metrics) + .subset_time("time_hashing_grouping_columns", partition), + build_hash_table_time: MetricBuilder::new(metrics) + .subset_time("build_hash_table_time", partition), + maximum_number_of_entries_in_map: MetricBuilder::new(metrics) + .gauge("maximum_number_of_entries_in_map", partition), + maximum_hash_map_capacity: MetricBuilder::new(metrics) + .gauge("maximum_hash_map_capacity", partition), + } + } +} + /// Trait for storing a single column of group values in [`GroupValuesColumn`] /// /// Implementations of this trait store an in-progress collection of group values @@ -220,6 +254,9 @@ pub struct GroupValuesColumn { /// Random state for creating hashes random_state: RandomState, + + /// Metrics for this group values column + metrics: MultiColumnGroupByMetrics, } /// Buffers to store intermediate results in `vectorized_append` @@ -260,7 +297,11 @@ impl GroupValuesColumn { // ======================================================================== /// Create a new instance of GroupValuesColumn if supported for the specified schema - pub fn try_new(schema: SchemaRef) -> Result { + pub fn try_new( + schema: SchemaRef, + metrics: &ExecutionPlanMetricsSet, + partition: usize, + ) -> Result { let map = HashTable::with_capacity(0); Ok(Self { schema, @@ -272,6 +313,7 @@ impl GroupValuesColumn { group_values: vec![], hashes_buffer: Default::default(), random_state: crate::aggregates::AGGREGATION_HASH_SEED, + metrics: MultiColumnGroupByMetrics::new(metrics, partition), }) } @@ -332,7 +374,12 @@ impl GroupValuesColumn { let batch_hashes = &mut self.hashes_buffer; batch_hashes.clear(); batch_hashes.resize(n_rows, 0); - create_hashes(cols, &self.random_state, batch_hashes)?; + { + let _timer = self.metrics.time_hashing_grouping_columns.timer(); + create_hashes(cols, &self.random_state, batch_hashes)?; + } + + let build_hash_map_timer = self.metrics.build_hash_table_time.timer(); for (row, &target_hash) in batch_hashes.iter().enumerate() { let entry = self @@ -405,6 +452,15 @@ impl GroupValuesColumn { groups.push(group_idx); } + build_hash_map_timer.done(); + + self.metrics + .maximum_hash_map_capacity + .set_max(self.map.capacity()); + self.metrics + .maximum_number_of_entries_in_map + .set_max(self.map.len()); + Ok(()) } @@ -433,7 +489,10 @@ impl GroupValuesColumn { let mut batch_hashes = mem::take(&mut self.hashes_buffer); batch_hashes.clear(); batch_hashes.resize(n_rows, 0); - create_hashes(cols, &self.random_state, &mut batch_hashes)?; + { + let _timer = self.metrics.time_hashing_grouping_columns.timer(); + create_hashes(cols, &self.random_state, &mut batch_hashes)?; + }; // General steps for one round `vectorized equal_to & append`: // 1. Collect vectorized context by checking hash values of `cols` in `map`, @@ -499,7 +558,11 @@ impl GroupValuesColumn { .equal_to_group_indices .clear(); - let mut group_values_len = self.group_values[0].len(); + let build_hash_table_timer = self.metrics.build_hash_table_time.timer(); + + let initial_group_values_len = self.group_values[0].len(); + let mut group_values_len = initial_group_values_len; + for (row, &target_hash) in batch_hashes.iter().enumerate() { let entry = self .map @@ -557,6 +620,14 @@ impl GroupValuesColumn { .push(group_index); } } + build_hash_table_timer.done(); + + self.metrics + .maximum_number_of_entries_in_map + .set_max(self.map.len()); + self.metrics + .maximum_hash_map_capacity + .set_max(self.map.capacity()); } /// Perform `vectorized_append`` for `rows` in `vectorized_append_row_indices` @@ -679,7 +750,7 @@ impl GroupValuesColumn { /// It is possible that some `input rows` have the same /// hash values with the `exist rows`, but have the different - /// actual values the exists. + /// actual values the exists (hash collision). /// /// We can found them in `vectorized_equal_to`, and put them /// into `scalarized_indices`. And for these `input rows`, @@ -1256,6 +1327,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::{compute::concat_batches, util::pretty::pretty_format_batches}; use datafusion_common::utils::proxy::HashTableAllocExt; + use datafusion_execution::metrics::ExecutionPlanMetricsSet; use datafusion_expr::EmitTo; use crate::aggregates::group_values::{ @@ -1267,8 +1339,12 @@ mod tests { #[test] fn test_intern_for_vectorized_group_values() { let data_set = VectorizedTestDataSet::new(); - let mut group_values = - GroupValuesColumn::::try_new(data_set.schema()).unwrap(); + let mut group_values = GroupValuesColumn::::try_new( + data_set.schema(), + &ExecutionPlanMetricsSet::new(), + 0, + ) + .unwrap(); data_set.load_to_group_values(&mut group_values); let actual_batch = group_values.emit(EmitTo::All).unwrap(); @@ -1280,8 +1356,12 @@ mod tests { #[test] fn test_emit_first_n_for_vectorized_group_values() { let data_set = VectorizedTestDataSet::new(); - let mut group_values = - GroupValuesColumn::::try_new(data_set.schema()).unwrap(); + let mut group_values = GroupValuesColumn::::try_new( + data_set.schema(), + &ExecutionPlanMetricsSet::new(), + 0, + ) + .unwrap(); // 1~num_rows times to emit the groups let num_rows = data_set.expected_batch.num_rows(); @@ -1332,7 +1412,12 @@ mod tests { let field = Field::new_list_field(DataType::Int32, true); let schema = Arc::new(Schema::new_with_metadata(vec![field], HashMap::new())); - let mut group_values = GroupValuesColumn::::try_new(schema).unwrap(); + let mut group_values = GroupValuesColumn::::try_new( + schema, + &ExecutionPlanMetricsSet::new(), + 0, + ) + .unwrap(); // Insert group index views and check if success to insert insert_inline_group_index_view(&mut group_values, 0, 0); diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index cb22fbf9a06a1..265f0ff3306b7 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -590,7 +590,8 @@ impl GroupedHashAggregateStream { _ => OutOfMemoryMode::ReportError, }; - let group_values = new_group_values(group_schema, &group_ordering)?; + let group_values = + new_group_values(group_schema, &group_ordering, &agg.metrics, partition)?; let reservation = MemoryConsumer::new(name) // We interpret 'can spill' as 'can handle memory back pressure'. // This value needs to be set to true for the default memory pool implementations diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 683dbb4e49765..3d0d13d5e87a4 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -194,14 +194,14 @@ impl ExecutionPlan for RecursiveQueryExec { } let static_stream = self.static_term.execute(partition, Arc::clone(&context))?; - let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); Ok(Box::pin(RecursiveQueryStream::new( context, Arc::clone(&self.work_table), Arc::clone(&self.recursive_term), static_stream, self.is_distinct, - baseline_metrics, + &self.metrics, + partition, )?)) } @@ -287,14 +287,24 @@ impl RecursiveQueryStream { recursive_term: Arc, static_stream: SendableRecordBatchStream, is_distinct: bool, - baseline_metrics: BaselineMetrics, + metrics: &ExecutionPlanMetricsSet, + partition: usize, ) -> Result { let schema = static_stream.schema(); let reservation = MemoryConsumer::new("RecursiveQuery").register(task_context.memory_pool()); let distinct_deduplicator = is_distinct - .then(|| DistinctDeduplicator::new(Arc::clone(&schema), &task_context)) + .then(|| { + DistinctDeduplicator::new( + Arc::clone(&schema), + &task_context, + metrics, + partition, + ) + }) .transpose()?; + let baseline_metrics = BaselineMetrics::new(metrics, partition); + Ok(Self { task_context, work_table, @@ -453,8 +463,14 @@ struct DistinctDeduplicator { } impl DistinctDeduplicator { - fn new(schema: SchemaRef, task_context: &TaskContext) -> Result { - let group_values = new_group_values(schema, &GroupOrdering::None)?; + fn new( + schema: SchemaRef, + task_context: &TaskContext, + metrics: &ExecutionPlanMetricsSet, + partition: usize, + ) -> Result { + let group_values = + new_group_values(schema, &GroupOrdering::None, metrics, partition)?; let reservation = MemoryConsumer::new("RecursiveQueryHashTable") .register(task_context.memory_pool()); Ok(Self {