Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions datafusion/physical-plan/src/aggregates/group_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow::array::types::{
use arrow::array::{ArrayRef, downcast_primitive};
use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
use datafusion_common::Result;

use datafusion_execution::metrics::ExecutionPlanMetricsSet;
use datafusion_expr::EmitTo;

pub mod multi_group_by;
Expand Down Expand Up @@ -134,6 +134,8 @@ pub trait GroupValues: Send {
pub fn new_group_values(
schema: SchemaRef,
group_ordering: &GroupOrdering,
metrics: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<Box<dyn GroupValues>> {
if schema.fields.len() == 1 {
let d = schema.fields[0].data_type();
Expand Down Expand Up @@ -202,9 +204,13 @@ pub fn new_group_values(

if multi_group_by::supported_schema(schema.as_ref()) {
if matches!(group_ordering, GroupOrdering::None) {
Ok(Box::new(GroupValuesColumn::<false>::try_new(schema)?))
Ok(Box::new(GroupValuesColumn::<false>::try_new(
schema, metrics, partition,
)?))
} else {
Ok(Box::new(GroupValuesColumn::<true>::try_new(schema)?))
Ok(Box::new(GroupValuesColumn::<true>::try_new(
schema, metrics, partition,
)?))
}
} else {
Ok(Box::new(GroupValuesRows::try_new(schema)?))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,45 @@ use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt};
use datafusion_expr::EmitTo;
use datafusion_physical_expr::binary_map::OutputType;

use datafusion_execution::metrics::{
ExecutionPlanMetricsSet, Gauge, MetricBuilder, Time,
};
use hashbrown::hash_table::HashTable;

const NON_INLINED_FLAG: u64 = 0x8000000000000000;
const VALUE_MASK: u64 = 0x7FFFFFFFFFFFFFFF;

pub(crate) struct MultiColumnGroupByMetrics {
/// Time spent hashing the grouping columns
pub(crate) time_hashing_grouping_columns: Time,

/// Time spent building and appending the hash table for grouping columns
pub(crate) build_hash_table_time: Time,

/// Track the maximum number of entries that map held
///
/// Very large value will probably indicate problems with fetching from the hash table
pub(crate) maximum_number_of_entries_in_map: Gauge,

/// Maximum hash map capacity
pub(crate) maximum_hash_map_capacity: Gauge,
}

impl MultiColumnGroupByMetrics {
pub(crate) fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
Self {
time_hashing_grouping_columns: MetricBuilder::new(metrics)
.subset_time("time_hashing_grouping_columns", partition),
build_hash_table_time: MetricBuilder::new(metrics)
.subset_time("build_hash_table_time", partition),
maximum_number_of_entries_in_map: MetricBuilder::new(metrics)
.gauge("maximum_number_of_entries_in_map", partition),
maximum_hash_map_capacity: MetricBuilder::new(metrics)
.gauge("maximum_hash_map_capacity", partition),
}
}
}

/// Trait for storing a single column of group values in [`GroupValuesColumn`]
///
/// Implementations of this trait store an in-progress collection of group values
Expand Down Expand Up @@ -220,6 +254,9 @@ pub struct GroupValuesColumn<const STREAMING: bool> {

/// Random state for creating hashes
random_state: RandomState,

/// Metrics for this group values column
metrics: MultiColumnGroupByMetrics,
}

/// Buffers to store intermediate results in `vectorized_append`
Expand Down Expand Up @@ -260,7 +297,11 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
// ========================================================================

/// Create a new instance of GroupValuesColumn if supported for the specified schema
pub fn try_new(schema: SchemaRef) -> Result<Self> {
pub fn try_new(
schema: SchemaRef,
metrics: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<Self> {
let map = HashTable::with_capacity(0);
Ok(Self {
schema,
Expand All @@ -272,6 +313,7 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
group_values: vec![],
hashes_buffer: Default::default(),
random_state: crate::aggregates::AGGREGATION_HASH_SEED,
metrics: MultiColumnGroupByMetrics::new(metrics, partition),
})
}

Expand Down Expand Up @@ -332,7 +374,12 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
let batch_hashes = &mut self.hashes_buffer;
batch_hashes.clear();
batch_hashes.resize(n_rows, 0);
create_hashes(cols, &self.random_state, batch_hashes)?;
{
let _timer = self.metrics.time_hashing_grouping_columns.timer();
create_hashes(cols, &self.random_state, batch_hashes)?;
}

let build_hash_map_timer = self.metrics.build_hash_table_time.timer();

for (row, &target_hash) in batch_hashes.iter().enumerate() {
let entry = self
Expand Down Expand Up @@ -405,6 +452,15 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
groups.push(group_idx);
}

build_hash_map_timer.done();

self.metrics
.maximum_hash_map_capacity
.set_max(self.map.capacity());
self.metrics
.maximum_number_of_entries_in_map
.set_max(self.map.len());

Ok(())
}

Expand Down Expand Up @@ -433,7 +489,10 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
let mut batch_hashes = mem::take(&mut self.hashes_buffer);
batch_hashes.clear();
batch_hashes.resize(n_rows, 0);
create_hashes(cols, &self.random_state, &mut batch_hashes)?;
{
let _timer = self.metrics.time_hashing_grouping_columns.timer();
create_hashes(cols, &self.random_state, &mut batch_hashes)?;
};

// General steps for one round `vectorized equal_to & append`:
// 1. Collect vectorized context by checking hash values of `cols` in `map`,
Expand Down Expand Up @@ -499,7 +558,11 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
.equal_to_group_indices
.clear();

let mut group_values_len = self.group_values[0].len();
let build_hash_table_timer = self.metrics.build_hash_table_time.timer();

let initial_group_values_len = self.group_values[0].len();
let mut group_values_len = initial_group_values_len;

for (row, &target_hash) in batch_hashes.iter().enumerate() {
let entry = self
.map
Expand Down Expand Up @@ -557,6 +620,14 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
.push(group_index);
}
}
build_hash_table_timer.done();

self.metrics
.maximum_number_of_entries_in_map
.set_max(self.map.len());
self.metrics
.maximum_hash_map_capacity
.set_max(self.map.capacity());
}

/// Perform `vectorized_append`` for `rows` in `vectorized_append_row_indices`
Expand Down Expand Up @@ -679,7 +750,7 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {

/// It is possible that some `input rows` have the same
/// hash values with the `exist rows`, but have the different
/// actual values the exists.
/// actual values the exists (hash collision).
///
/// We can found them in `vectorized_equal_to`, and put them
/// into `scalarized_indices`. And for these `input rows`,
Expand Down Expand Up @@ -1256,6 +1327,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::{compute::concat_batches, util::pretty::pretty_format_batches};
use datafusion_common::utils::proxy::HashTableAllocExt;
use datafusion_execution::metrics::ExecutionPlanMetricsSet;
use datafusion_expr::EmitTo;

use crate::aggregates::group_values::{
Expand All @@ -1267,8 +1339,12 @@ mod tests {
#[test]
fn test_intern_for_vectorized_group_values() {
let data_set = VectorizedTestDataSet::new();
let mut group_values =
GroupValuesColumn::<false>::try_new(data_set.schema()).unwrap();
let mut group_values = GroupValuesColumn::<false>::try_new(
data_set.schema(),
&ExecutionPlanMetricsSet::new(),
0,
)
.unwrap();

data_set.load_to_group_values(&mut group_values);
let actual_batch = group_values.emit(EmitTo::All).unwrap();
Expand All @@ -1280,8 +1356,12 @@ mod tests {
#[test]
fn test_emit_first_n_for_vectorized_group_values() {
let data_set = VectorizedTestDataSet::new();
let mut group_values =
GroupValuesColumn::<false>::try_new(data_set.schema()).unwrap();
let mut group_values = GroupValuesColumn::<false>::try_new(
data_set.schema(),
&ExecutionPlanMetricsSet::new(),
0,
)
.unwrap();

// 1~num_rows times to emit the groups
let num_rows = data_set.expected_batch.num_rows();
Expand Down Expand Up @@ -1332,7 +1412,12 @@ mod tests {

let field = Field::new_list_field(DataType::Int32, true);
let schema = Arc::new(Schema::new_with_metadata(vec![field], HashMap::new()));
let mut group_values = GroupValuesColumn::<false>::try_new(schema).unwrap();
let mut group_values = GroupValuesColumn::<false>::try_new(
schema,
&ExecutionPlanMetricsSet::new(),
0,
)
.unwrap();

// Insert group index views and check if success to insert
insert_inline_group_index_view(&mut group_values, 0, 0);
Expand Down
3 changes: 2 additions & 1 deletion datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,8 @@ impl GroupedHashAggregateStream {
_ => OutOfMemoryMode::ReportError,
};

let group_values = new_group_values(group_schema, &group_ordering)?;
let group_values =
new_group_values(group_schema, &group_ordering, &agg.metrics, partition)?;
let reservation = MemoryConsumer::new(name)
// We interpret 'can spill' as 'can handle memory back pressure'.
// This value needs to be set to true for the default memory pool implementations
Expand Down
28 changes: 22 additions & 6 deletions datafusion/physical-plan/src/recursive_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,14 @@ impl ExecutionPlan for RecursiveQueryExec {
}

let static_stream = self.static_term.execute(partition, Arc::clone(&context))?;
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
Ok(Box::pin(RecursiveQueryStream::new(
context,
Arc::clone(&self.work_table),
Arc::clone(&self.recursive_term),
static_stream,
self.is_distinct,
baseline_metrics,
&self.metrics,
partition,
)?))
}

Expand Down Expand Up @@ -287,14 +287,24 @@ impl RecursiveQueryStream {
recursive_term: Arc<dyn ExecutionPlan>,
static_stream: SendableRecordBatchStream,
is_distinct: bool,
baseline_metrics: BaselineMetrics,
metrics: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<Self> {
let schema = static_stream.schema();
let reservation =
MemoryConsumer::new("RecursiveQuery").register(task_context.memory_pool());
let distinct_deduplicator = is_distinct
.then(|| DistinctDeduplicator::new(Arc::clone(&schema), &task_context))
.then(|| {
DistinctDeduplicator::new(
Arc::clone(&schema),
&task_context,
metrics,
partition,
)
})
.transpose()?;
let baseline_metrics = BaselineMetrics::new(metrics, partition);

Ok(Self {
task_context,
work_table,
Expand Down Expand Up @@ -453,8 +463,14 @@ struct DistinctDeduplicator {
}

impl DistinctDeduplicator {
fn new(schema: SchemaRef, task_context: &TaskContext) -> Result<Self> {
let group_values = new_group_values(schema, &GroupOrdering::None)?;
fn new(
schema: SchemaRef,
task_context: &TaskContext,
metrics: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<Self> {
let group_values =
new_group_values(schema, &GroupOrdering::None, metrics, partition)?;
let reservation = MemoryConsumer::new("RecursiveQueryHashTable")
.register(task_context.memory_pool());
Ok(Self {
Expand Down