From 7d158c9bc6ba06cc793b2d5199214b202b6a79ca Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Fri, 26 Dec 2025 18:07:13 +0100 Subject: [PATCH] Fix over accounting in array accumulators using Arrow Memory Pool --- Cargo.lock | 10 ++ clippy.toml | 6 + datafusion-examples/Cargo.toml | 1 + .../examples/udf/advanced_udaf.rs | 5 +- .../examples/udf/simple_udaf.rs | 3 +- datafusion/common/src/scalar/mod.rs | 14 ++ datafusion/core/Cargo.toml | 1 + .../user_defined/user_defined_aggregates.rs | 10 +- .../user_defined_scalar_functions.rs | 2 +- datafusion/expr-common/Cargo.toml | 1 + datafusion/expr-common/src/accumulator.rs | 33 ++++- .../expr-common/src/groups_accumulator.rs | 32 +++- datafusion/ffi/Cargo.toml | 1 + datafusion/ffi/src/udaf/accumulator.rs | 15 +- datafusion/ffi/src/udaf/groups_accumulator.rs | 11 +- .../functions-aggregate-common/Cargo.toml | 1 + .../src/aggregate/avg_distinct/decimal.rs | 5 +- .../src/aggregate/avg_distinct/numeric.rs | 5 +- .../src/aggregate/count_distinct/bytes.rs | 5 +- .../src/aggregate/count_distinct/dict.rs | 5 +- .../src/aggregate/count_distinct/native.rs | 7 +- .../src/aggregate/groups_accumulator.rs | 17 ++- .../groups_accumulator/accumulate.rs | 3 +- .../aggregate/groups_accumulator/bool_op.rs | 5 +- .../aggregate/groups_accumulator/prim_op.rs | 5 +- .../src/aggregate/sum_distinct/numeric.rs | 5 +- .../functions-aggregate-common/src/min_max.rs | 5 +- .../src/noop_accumulator.rs | 3 +- .../functions-aggregate-common/src/utils.rs | 3 +- datafusion/functions-aggregate/Cargo.toml | 1 + .../src/approx_distinct.rs | 3 +- .../src/approx_percentile_cont.rs | 3 +- .../src/approx_percentile_cont_with_weight.rs | 5 +- .../functions-aggregate/src/array_agg.rs | 137 ++++++++++++------ datafusion/functions-aggregate/src/average.rs | 9 +- .../functions-aggregate/src/bit_and_or_xor.rs | 9 +- .../functions-aggregate/src/bool_and_or.rs | 5 +- .../functions-aggregate/src/correlation.rs | 11 +- datafusion/functions-aggregate/src/count.rs | 85 +++++++++-- .../functions-aggregate/src/covariance.rs | 3 +- .../functions-aggregate/src/first_last.rs | 119 ++++++++++++--- datafusion/functions-aggregate/src/median.rs | 9 +- datafusion/functions-aggregate/src/min_max.rs | 5 +- .../src/min_max/min_max_bytes.rs | 7 +- .../src/min_max/min_max_struct.rs | 7 +- .../functions-aggregate/src/nth_value.rs | 5 +- .../src/percentile_cont.rs | 9 +- datafusion/functions-aggregate/src/regr.rs | 3 +- datafusion/functions-aggregate/src/stddev.rs | 9 +- .../functions-aggregate/src/string_agg.rs | 7 +- datafusion/functions-aggregate/src/sum.rs | 7 +- datafusion/functions-aggregate/src/utils.rs | 13 ++ .../functions-aggregate/src/variance.rs | 5 +- datafusion/physical-plan/Cargo.toml | 1 + .../src/aggregates/no_grouping.rs | 9 +- .../physical-plan/src/aggregates/row_hash.rs | 22 ++- datafusion/proto/Cargo.toml | 1 + .../tests/cases/roundtrip_logical_plan.rs | 5 +- .../tests/cases/roundtrip_physical_plan.rs | 3 +- datafusion/spark/Cargo.toml | 1 + .../spark/src/function/aggregate/avg.rs | 5 +- .../spark/src/function/aggregate/try_sum.rs | 3 +- datafusion/substrait/Cargo.toml | 1 + .../tests/cases/roundtrip_logical_plan.rs | 3 +- 64 files changed, 544 insertions(+), 205 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 22ec582536069..e35c2f9a47973 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1833,6 +1833,7 @@ name = "datafusion" version = "51.0.0" dependencies = [ "arrow", + "arrow-buffer", "arrow-schema", "async-trait", "bytes", @@ -2194,6 +2195,7 @@ name = "datafusion-examples" version = "51.0.0" dependencies = [ "arrow", + "arrow-buffer", "arrow-flight", "arrow-schema", "async-trait", @@ -2278,6 +2280,7 @@ name = "datafusion-expr-common" version = "51.0.0" dependencies = [ "arrow", + "arrow-buffer", "datafusion-common", "indexmap 2.12.1", "itertools 0.14.0", @@ -2290,6 +2293,7 @@ version = "51.0.0" dependencies = [ "abi_stable", "arrow", + "arrow-buffer", "arrow-schema", "async-ffi", "async-trait", @@ -2356,6 +2360,7 @@ version = "51.0.0" dependencies = [ "ahash 0.8.12", "arrow", + "arrow-buffer", "criterion", "datafusion-common", "datafusion-doc", @@ -2377,6 +2382,7 @@ version = "51.0.0" dependencies = [ "ahash 0.8.12", "arrow", + "arrow-buffer", "criterion", "datafusion-common", "datafusion-expr-common", @@ -2559,6 +2565,7 @@ version = "51.0.0" dependencies = [ "ahash 0.8.12", "arrow", + "arrow-buffer", "arrow-ord", "arrow-schema", "async-trait", @@ -2594,6 +2601,7 @@ name = "datafusion-proto" version = "51.0.0" dependencies = [ "arrow", + "arrow-buffer", "async-trait", "chrono", "datafusion", @@ -2673,6 +2681,7 @@ name = "datafusion-spark" version = "51.0.0" dependencies = [ "arrow", + "arrow-buffer", "bigdecimal", "chrono", "crc32fast", @@ -2754,6 +2763,7 @@ dependencies = [ name = "datafusion-substrait" version = "51.0.0" dependencies = [ + "arrow-buffer", "async-recursion", "async-trait", "chrono", diff --git a/clippy.toml b/clippy.toml index ea3609b574c06..ed4c347d675ce 100644 --- a/clippy.toml +++ b/clippy.toml @@ -7,6 +7,12 @@ disallowed-types = [ { path = "std::time::Instant", reason = "Use `datafusion_common::instant::Instant` instead for WASM compatibility" }, ] +ignore-interior-mutability = [ + # arrow_buffer::Bytes contains Mutex for memory pool tracking, + # but this doesn't affect Hash/Eq implementations (those only depend on data content). + "arrow_buffer::bytes::Bytes", +] + # Lowering the threshold to help prevent stack overflows (default is 16384) # See: https://rust-lang.github.io/rust-clippy/master/index.html#/large_futures future-size-threshold = 10000 diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index b0190dadf3c3f..15c99c6754162 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -37,6 +37,7 @@ workspace = true [dev-dependencies] arrow = { workspace = true } +arrow-buffer = { workspace = true, features = ["pool"] } # arrow_schema is required for record_batch! macro :sad: arrow-flight = { workspace = true } arrow-schema = { workspace = true } diff --git a/datafusion-examples/examples/udf/advanced_udaf.rs b/datafusion-examples/examples/udf/advanced_udaf.rs index fbb9e652486ce..9fd21e0ff108c 100644 --- a/datafusion-examples/examples/udf/advanced_udaf.rs +++ b/datafusion-examples/examples/udf/advanced_udaf.rs @@ -18,6 +18,7 @@ //! See `main.rs` for how to run it. use arrow::datatypes::{Field, Schema}; +use arrow_buffer::MemoryPool; use datafusion::physical_expr::NullState; use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility}; use std::{any::Any, sync::Arc}; @@ -195,7 +196,7 @@ impl Accumulator for GeometricMean { }) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } } @@ -362,7 +363,7 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator { ]) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { self.counts.capacity() * size_of::() + self.prods.capacity() * size_of::() } diff --git a/datafusion-examples/examples/udf/simple_udaf.rs b/datafusion-examples/examples/udf/simple_udaf.rs index 42ea0054b759f..28ed578ed7c60 100644 --- a/datafusion-examples/examples/udf/simple_udaf.rs +++ b/datafusion-examples/examples/udf/simple_udaf.rs @@ -17,6 +17,7 @@ //! See `main.rs` for how to run it. //! +use arrow_buffer::MemoryPool; /// In this example we will declare a single-type, single return type UDAF that computes the geometric mean. /// The geometric mean is described here: https://en.wikipedia.org/wiki/Geometric_mean use datafusion::arrow::{ @@ -132,7 +133,7 @@ impl Accumulator for GeometricMean { }) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } } diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 859b32443095f..a81809d051410 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -3418,6 +3418,20 @@ impl ScalarValue { } } + /// Returns the inner ArrayRef if this ScalarValue contains one + /// (List, LargeList, FixedSizeList, Struct, Map variants). + /// Returns None for primitive types. + pub fn get_array_ref(&self) -> Option { + match self { + ScalarValue::List(arr) => Some(Arc::clone(arr) as ArrayRef), + ScalarValue::LargeList(arr) => Some(Arc::clone(arr) as ArrayRef), + ScalarValue::FixedSizeList(arr) => Some(Arc::clone(arr) as ArrayRef), + ScalarValue::Struct(arr) => Some(Arc::clone(arr) as ArrayRef), + ScalarValue::Map(arr) => Some(Arc::clone(arr) as ArrayRef), + _ => None, + } + } + /// Converts a value in `array` at `index` into a ScalarValue pub fn try_from_array(array: &dyn Array, index: usize) -> Result { // handle NULL value diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index bd88ed3b9ca1e..af1632e7c3042 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -112,6 +112,7 @@ extended_tests = [] [dependencies] arrow = { workspace = true } +arrow-buffer = { workspace = true, features = ["pool"] } arrow-schema = { workspace = true, features = ["canonical_extension_types"] } async-trait = { workspace = true } bytes = { workspace = true } diff --git a/datafusion/core/tests/user_defined/user_defined_aggregates.rs b/datafusion/core/tests/user_defined/user_defined_aggregates.rs index e7bd2241398ad..95f5701354444 100644 --- a/datafusion/core/tests/user_defined/user_defined_aggregates.rs +++ b/datafusion/core/tests/user_defined/user_defined_aggregates.rs @@ -629,7 +629,7 @@ impl Accumulator for TimeSum { Ok(ScalarValue::TimestampNanosecond(Some(self.sum), None)) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn arrow_buffer::MemoryPool>) -> usize { // accurate size estimates are not important for this example 42 } @@ -781,7 +781,7 @@ impl Accumulator for FirstSelector { self.update_batch(states) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn arrow_buffer::MemoryPool>) -> usize { size_of_val(self) } } @@ -835,7 +835,7 @@ impl Accumulator for TestGroupsAccumulator { Ok(ScalarValue::from(self.result)) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn arrow_buffer::MemoryPool>) -> usize { size_of::() } @@ -883,7 +883,7 @@ impl GroupsAccumulator for TestGroupsAccumulator { Ok(()) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn arrow_buffer::MemoryPool>) -> usize { size_of::() } } @@ -999,7 +999,7 @@ impl Accumulator for MetadataBasedAccumulator { Ok(ScalarValue::from(v)) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn arrow_buffer::MemoryPool>) -> usize { 9 } diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index b86cd94a8a9b7..896d0390bab5a 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -365,7 +365,7 @@ async fn udaf_as_window_func() -> Result<()> { unimplemented!() } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn arrow_buffer::MemoryPool>) -> usize { unimplemented!() } } diff --git a/datafusion/expr-common/Cargo.toml b/datafusion/expr-common/Cargo.toml index 5ee46b454e791..f6f3860c3c44b 100644 --- a/datafusion/expr-common/Cargo.toml +++ b/datafusion/expr-common/Cargo.toml @@ -42,6 +42,7 @@ name = "datafusion_expr_common" [dependencies] arrow = { workspace = true } +arrow-buffer = { workspace = true, features = ["pool"] } datafusion-common = { workspace = true } indexmap = { workspace = true } itertools = { workspace = true } diff --git a/datafusion/expr-common/src/accumulator.rs b/datafusion/expr-common/src/accumulator.rs index fc4e90114beea..e03eb962af67d 100644 --- a/datafusion/expr-common/src/accumulator.rs +++ b/datafusion/expr-common/src/accumulator.rs @@ -18,6 +18,7 @@ //! Accumulator module contains the trait definition for aggregation function's accumulators. use arrow::array::ArrayRef; +use arrow_buffer::MemoryPool; use datafusion_common::{Result, ScalarValue, internal_err}; use std::fmt::Debug; @@ -71,15 +72,33 @@ pub trait Accumulator: Send + Sync + Debug { /// when possible (for example distinct strings) fn evaluate(&mut self) -> Result; - /// Returns the allocated size required for this accumulator, in - /// bytes, including `Self`. + /// Returns the allocated memory size required by this accumulator, in bytes. /// - /// This value is used to calculate the memory used during - /// execution so DataFusion can stay within its allotted limit. + /// This value is used to calculate the memory used during execution so + /// DataFusion can stay within its allocated memory limit. /// - /// "Allocated" means that for internal containers such as `Vec`, - /// the `capacity` should be used not the `len`. - fn size(&self) -> usize; + /// # Memory Pool for Shared Buffers + /// + /// When `pool` is `None`: + /// - Returns the total memory size including Arrow buffers + /// - This is the default behavior and works for most use cases + /// + /// When `pool` is `Some`: + /// - Returns only the structural size (the accumulator struct, Vec capacities, etc.) + /// - Claims Arrow buffers with the provided [`MemoryPool`] for tracking + /// - It is recommended when accumulators may share Arrow array buffers, as the pool + /// automatically deduplicates shared buffers to provide accurate memory accounting + /// - Callers should add `pool.used()` to this return value for the total memory + /// + /// # Example + /// + /// ```ignore + /// // With memory pool for accurate tracking when buffers may be shared + /// let pool = TrackingMemoryPool::default(); + /// let structural_size = accumulator.size(Some(&pool)); + /// let total_memory = structural_size + pool.used(); + /// ``` + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize; /// Returns the intermediate state of the accumulator, consuming the /// intermediate state. diff --git a/datafusion/expr-common/src/groups_accumulator.rs b/datafusion/expr-common/src/groups_accumulator.rs index 860e69245a7fd..650125d97dfd2 100644 --- a/datafusion/expr-common/src/groups_accumulator.rs +++ b/datafusion/expr-common/src/groups_accumulator.rs @@ -18,6 +18,7 @@ //! Vectorized [`GroupsAccumulator`] use arrow::array::{ArrayRef, BooleanArray}; +use arrow_buffer::MemoryPool; use datafusion_common::{Result, not_impl_err}; /// Describes how many rows should be emitted during grouping. @@ -244,10 +245,31 @@ pub trait GroupsAccumulator: Send { false } - /// Amount of memory used to store the state of this accumulator, - /// in bytes. + /// Returns the allocated memory size required by this accumulator, in bytes. /// - /// This function is called once per batch, so it should be `O(n)` to - /// compute, not `O(num_groups)` - fn size(&self) -> usize; + /// This value is used to calculate the memory used during execution so + /// DataFusion can stay within its allocated memory limit. + /// + /// # Memory Pool for Shared Buffers + /// + /// When `pool` is `None`: + /// - Returns the total memory size including Arrow buffers + /// - This is the default behavior and works for most use cases + /// + /// When `pool` is `Some`: + /// - Returns only the structural size (the accumulator struct, Vec capacities, etc.) + /// - Claims Arrow buffers with the provided [`MemoryPool`] for tracking + /// - It is recommended when accumulators may share Arrow array buffers, as the pool + /// automatically deduplicates shared buffers to provide accurate memory accounting + /// - Callers should add `pool.used()` to this return value for the total memory + /// + /// # Example + /// + /// ```ignore + /// // With memory pool for accurate tracking when buffers may be shared + /// let pool = TrackingMemoryPool::default(); + /// let structural_size = accumulator.size(Some(&pool)); + /// let total_memory = structural_size + pool.used(); + /// ``` + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize; } diff --git a/datafusion/ffi/Cargo.toml b/datafusion/ffi/Cargo.toml index 28e1b2ee5681f..ca4f22c5c6ec6 100644 --- a/datafusion/ffi/Cargo.toml +++ b/datafusion/ffi/Cargo.toml @@ -46,6 +46,7 @@ crate-type = ["cdylib", "rlib"] [dependencies] abi_stable = "0.11.3" arrow = { workspace = true, features = ["ffi"] } +arrow-buffer = { workspace = true, features = ["pool"] } arrow-schema = { workspace = true } async-ffi = { version = "0.5.0", features = ["abi_stable"] } async-trait = { workspace = true } diff --git a/datafusion/ffi/src/udaf/accumulator.rs b/datafusion/ffi/src/udaf/accumulator.rs index 44bfc5bdeb6cd..996480fcb9f48 100644 --- a/datafusion/ffi/src/udaf/accumulator.rs +++ b/datafusion/ffi/src/udaf/accumulator.rs @@ -23,6 +23,7 @@ use abi_stable::StableAbi; use abi_stable::std_types::{RResult, RVec}; use arrow::array::ArrayRef; use arrow::error::ArrowError; +use arrow_buffer::MemoryPool; use datafusion_common::error::{DataFusionError, Result}; use datafusion_common::scalar::ScalarValue; use datafusion_expr::Accumulator; @@ -133,7 +134,7 @@ unsafe extern "C" fn evaluate_fn_wrapper( } unsafe extern "C" fn size_fn_wrapper(accumulator: &FFI_Accumulator) -> usize { - unsafe { accumulator.inner().size() } + unsafe { accumulator.inner().size(None) } } unsafe extern "C" fn state_fn_wrapper( @@ -284,7 +285,7 @@ impl Accumulator for ForeignAccumulator { } } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { unsafe { (self.accumulator.size)(&self.accumulator) } } @@ -352,7 +353,7 @@ mod tests { #[test] fn test_foreign_avg_accumulator() -> Result<()> { let original_accum = AvgAccumulator::default(); - let original_size = original_accum.size(); + let original_size = original_accum.size(None); let original_supports_retract = original_accum.supports_retract_batch(); let boxed_accum: Box = Box::new(original_accum); @@ -390,7 +391,7 @@ mod tests { let avg = foreign_accum.evaluate()?; assert_eq!(avg, ScalarValue::Float64(Some(30.0))); - assert_eq!(original_size, foreign_accum.size()); + assert_eq!(original_size, foreign_accum.size(None)); assert_eq!( original_supports_retract, foreign_accum.supports_retract_batch() @@ -403,7 +404,7 @@ mod tests { fn test_ffi_accumulator_local_bypass() -> Result<()> { let original_accum = AvgAccumulator::default(); let boxed_accum: Box = Box::new(original_accum); - let original_size = boxed_accum.size(); + let original_size = boxed_accum.size(None); let ffi_accum: FFI_Accumulator = boxed_accum.into(); @@ -412,7 +413,7 @@ mod tests { unsafe { let concrete = &*(foreign_accum.as_ref() as *const dyn Accumulator as *const AvgAccumulator); - assert_eq!(original_size, concrete.size()); + assert_eq!(original_size, concrete.size(None)); } // Verify different library markers generate foreign accumulator @@ -424,7 +425,7 @@ mod tests { unsafe { let concrete = &*(foreign_accum.as_ref() as *const dyn Accumulator as *const ForeignAccumulator); - assert_eq!(original_size, concrete.size()); + assert_eq!(original_size, concrete.size(None)); } Ok(()) diff --git a/datafusion/ffi/src/udaf/groups_accumulator.rs b/datafusion/ffi/src/udaf/groups_accumulator.rs index 73a6befd819a0..5c453b330b252 100644 --- a/datafusion/ffi/src/udaf/groups_accumulator.rs +++ b/datafusion/ffi/src/udaf/groups_accumulator.rs @@ -25,6 +25,7 @@ use abi_stable::std_types::{ROption, RVec}; use arrow::array::{Array, ArrayRef, BooleanArray}; use arrow::error::ArrowError; use arrow::ffi::to_ffi; +use arrow_buffer::MemoryPool; use datafusion_common::error::{DataFusionError, Result}; use datafusion_expr::{EmitTo, GroupsAccumulator}; @@ -168,7 +169,7 @@ unsafe extern "C" fn evaluate_fn_wrapper( unsafe extern "C" fn size_fn_wrapper(accumulator: &FFI_GroupsAccumulator) -> usize { unsafe { let accumulator = accumulator.inner(); - accumulator.size() + accumulator.size(None) } } @@ -335,7 +336,7 @@ impl GroupsAccumulator for ForeignGroupsAccumulator { } } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { unsafe { (self.accumulator.size)(&self.accumulator) } } @@ -552,7 +553,7 @@ mod tests { fn test_ffi_groups_accumulator_local_bypass_inner() -> Result<()> { let original_accum = StddevGroupsAccumulator::new(StatsType::Population); let boxed_accum: Box = Box::new(original_accum); - let original_size = boxed_accum.size(); + let original_size = boxed_accum.size(None); let ffi_accum: FFI_GroupsAccumulator = boxed_accum.into(); @@ -561,7 +562,7 @@ mod tests { unsafe { let concrete = &*(foreign_accum.as_ref() as *const dyn GroupsAccumulator as *const StddevGroupsAccumulator); - assert_eq!(original_size, concrete.size()); + assert_eq!(original_size, concrete.size(None)); } // Verify different library markers generate foreign accumulator @@ -573,7 +574,7 @@ mod tests { unsafe { let concrete = &*(foreign_accum.as_ref() as *const dyn GroupsAccumulator as *const ForeignGroupsAccumulator); - assert_eq!(original_size, concrete.size()); + assert_eq!(original_size, concrete.size(None)); } Ok(()) diff --git a/datafusion/functions-aggregate-common/Cargo.toml b/datafusion/functions-aggregate-common/Cargo.toml index 1d4fb29d9c674..acf71a101b2bd 100644 --- a/datafusion/functions-aggregate-common/Cargo.toml +++ b/datafusion/functions-aggregate-common/Cargo.toml @@ -43,6 +43,7 @@ name = "datafusion_functions_aggregate_common" [dependencies] ahash = { workspace = true } arrow = { workspace = true } +arrow-buffer = { workspace = true, features = ["pool"] } datafusion-common = { workspace = true } datafusion-expr-common = { workspace = true } datafusion-physical-expr-common = { workspace = true } diff --git a/datafusion/functions-aggregate-common/src/aggregate/avg_distinct/decimal.rs b/datafusion/functions-aggregate-common/src/aggregate/avg_distinct/decimal.rs index 0a4c1692baa84..ffa472c1ed637 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/avg_distinct/decimal.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/avg_distinct/decimal.rs @@ -21,6 +21,7 @@ use arrow::{ Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type, DecimalType, i256, }, }; +use arrow_buffer::MemoryPool; use datafusion_common::{Result, ScalarValue}; use datafusion_expr_common::accumulator::Accumulator; use std::fmt::Debug; @@ -146,11 +147,11 @@ impl Accumulator } } - fn size(&self) -> usize { + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { let fixed_size = size_of_val(self); // Account for the size of the sum_accumulator with its contained values - fixed_size + self.sum_accumulator.size() + fixed_size + self.sum_accumulator.size(pool) } } diff --git a/datafusion/functions-aggregate-common/src/aggregate/avg_distinct/numeric.rs b/datafusion/functions-aggregate-common/src/aggregate/avg_distinct/numeric.rs index bb43acc2614f9..4f4c3cfc40d35 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/avg_distinct/numeric.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/avg_distinct/numeric.rs @@ -19,6 +19,7 @@ use std::fmt::Debug; use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Float64Type}; +use arrow_buffer::MemoryPool; use datafusion_common::{Result, ScalarValue}; use datafusion_expr_common::accumulator::Accumulator; @@ -72,7 +73,7 @@ impl Accumulator for Float64DistinctAvgAccumulator { } } - fn size(&self) -> usize { - self.sum_accumulator.size() + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { + self.sum_accumulator.size(pool) } } diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/bytes.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/bytes.rs index 6e0d55bd64372..325b243a54002 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/bytes.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/bytes.rs @@ -18,6 +18,7 @@ //! [`BytesDistinctCountAccumulator`] for Utf8/LargeUtf8/Binary/LargeBinary values use arrow::array::{ArrayRef, OffsetSizeTrait}; +use arrow_buffer::MemoryPool; use datafusion_common::ScalarValue; use datafusion_common::cast::as_list_array; use datafusion_common::utils::SingleRowListArrayBuilder; @@ -86,7 +87,7 @@ impl Accumulator for BytesDistinctCountAccumulator { Ok(ScalarValue::Int64(Some(self.0.non_null_len() as i64))) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) + self.0.size() } } @@ -147,7 +148,7 @@ impl Accumulator for BytesViewDistinctCountAccumulator { Ok(ScalarValue::Int64(Some(self.0.non_null_len() as i64))) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) + self.0.size() } } diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/dict.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/dict.rs index d71aed3debe95..dc5655efa6b67 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/dict.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/dict.rs @@ -17,6 +17,7 @@ use arrow::array::{ArrayRef, BooleanArray}; use arrow::downcast_dictionary_array; +use arrow_buffer::MemoryPool; use datafusion_common::internal_err; use datafusion_common::{ScalarValue, arrow_datafusion_err}; use datafusion_expr_common::accumulator::Accumulator; @@ -56,8 +57,8 @@ impl Accumulator for DictionaryCountAccumulator { self.inner.evaluate() } - fn size(&self) -> usize { - self.inner.size() + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { + self.inner.size(pool) } fn state(&mut self) -> datafusion_common::Result> { diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs index aa86052bcbbc0..a865bc50661d6 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs @@ -32,6 +32,7 @@ use arrow::array::PrimitiveArray; use arrow::array::types::ArrowPrimitiveType; use arrow::datatypes::DataType; +use arrow_buffer::MemoryPool; use datafusion_common::ScalarValue; use datafusion_common::cast::{as_list_array, as_primitive_array}; use datafusion_common::utils::SingleRowListArrayBuilder; @@ -117,7 +118,7 @@ where Ok(ScalarValue::Int64(Some(self.values.len() as i64))) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { let num_elements = self.values.len(); let fixed_size = size_of_val(self) + size_of_val(&self.values); @@ -161,7 +162,7 @@ impl Accumulator for FloatDistinctCountAccumulato Ok(ScalarValue::Int64(Some(self.values.values.len() as i64))) } - fn size(&self) -> usize { - size_of_val(self) + self.values.size() + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { + size_of_val(self) + self.values.size(pool) } } diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs index ad2a21bb4733c..2ce070c1faaaf 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs @@ -32,6 +32,7 @@ use arrow::{ compute::take_arrays, datatypes::UInt32Type, }; +use arrow_buffer::MemoryPool; use datafusion_common::{Result, ScalarValue, arrow_datafusion_err}; use datafusion_expr_common::accumulator::Accumulator; use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; @@ -121,8 +122,8 @@ impl AccumulatorState { } /// Returns the amount of memory taken by this structure and its accumulator - fn size(&self) -> usize { - self.accumulator.size() + size_of_val(self) + self.indices.allocated_size() + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { + self.accumulator.size(pool) + size_of_val(self) + self.indices.allocated_size() } } @@ -151,7 +152,7 @@ impl GroupsAccumulatorAdapter { for _ in 0..new_accumulators { let accumulator = (self.factory)()?; let state = AccumulatorState::new(accumulator); - self.add_allocation(state.size()); + self.add_allocation(state.size(None)); self.states.push(state); } @@ -247,7 +248,7 @@ impl GroupsAccumulatorAdapter { let mut sizes_post = 0; for (&group_idx, offsets) in iter { let state = &mut self.states[group_idx]; - sizes_pre += state.size(); + sizes_pre += state.size(None); let values_to_accumulate = slice_and_maybe_filter( &values, @@ -259,7 +260,7 @@ impl GroupsAccumulatorAdapter { // clear out the state so they are empty for next // iteration state.indices.clear(); - sizes_post += state.size(); + sizes_post += state.size(None); } self.adjust_allocation(sizes_pre, sizes_post); @@ -323,7 +324,7 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter { let results: Vec = states .into_iter() .map(|mut state| { - self.free_allocation(state.size()); + self.free_allocation(state.size(None)); state.accumulator.evaluate() }) .collect::>()?; @@ -345,7 +346,7 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter { let mut results: Vec> = vec![]; for mut state in states { - self.free_allocation(state.size()); + self.free_allocation(state.size(None)); let accumulator_state = state.accumulator.state()?; results.resize_with(accumulator_state.len(), Vec::new); for (idx, state_val) in accumulator_state.into_iter().enumerate() { @@ -391,7 +392,7 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter { Ok(()) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { self.allocation_bytes } diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index 29b8752048c3e..9c2fd8802d048 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -23,6 +23,7 @@ use arrow::array::{Array, BooleanArray, BooleanBufferBuilder, PrimitiveArray}; use arrow::buffer::{BooleanBuffer, NullBuffer}; use arrow::datatypes::ArrowPrimitiveType; +use arrow_buffer::MemoryPool; use datafusion_expr_common::groups_accumulator::EmitTo; /// Track the accumulator null state per row: if any values for that /// group were null and if any values have been seen at all for that group. @@ -75,7 +76,7 @@ impl NullState { } /// return the size of all buffers allocated by this null state, not including self - pub fn size(&self) -> usize { + pub fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { // capacity is in bits, so convert to bytes self.seen_values.capacity() / 8 } diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs index 149312e5a9c0f..f93873cedc6f2 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use crate::aggregate::groups_accumulator::nulls::filtered_null_mask; use arrow::array::{ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder}; use arrow::buffer::BooleanBuffer; +use arrow_buffer::MemoryPool; use datafusion_common::Result; use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; @@ -139,9 +140,9 @@ where self.update_batch(values, group_indices, opt_filter, total_num_groups) } - fn size(&self) -> usize { + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { // capacity is in bits, so convert to bytes - self.values.capacity() / 8 + self.null_state.size() + self.values.capacity() / 8 + self.null_state.size(pool) } fn convert_to_state( diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs index 656b95d140dde..33973b7ed4de4 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs @@ -23,6 +23,7 @@ use arrow::buffer::NullBuffer; use arrow::compute; use arrow::datatypes::ArrowPrimitiveType; use arrow::datatypes::DataType; +use arrow_buffer::MemoryPool; use datafusion_common::{DataFusionError, Result, internal_datafusion_err}; use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; @@ -194,7 +195,7 @@ where true } - fn size(&self) -> usize { - self.values.capacity() * size_of::() + self.null_state.size() + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { + self.values.capacity() * size_of::() + self.null_state.size(pool) } } diff --git a/datafusion/functions-aggregate-common/src/aggregate/sum_distinct/numeric.rs b/datafusion/functions-aggregate-common/src/aggregate/sum_distinct/numeric.rs index e5a23597c44ad..d92688f74753e 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/sum_distinct/numeric.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/sum_distinct/numeric.rs @@ -26,6 +26,7 @@ use arrow::array::ArrowPrimitiveType; use arrow::datatypes::ArrowNativeType; use arrow::datatypes::DataType; +use arrow_buffer::MemoryPool; use datafusion_common::Result; use datafusion_common::ScalarValue; use datafusion_expr_common::accumulator::Accumulator; @@ -77,7 +78,7 @@ impl Accumulator for DistinctSumAccumulator { } } - fn size(&self) -> usize { - size_of_val(self) + self.values.size() + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { + size_of_val(self) + self.values.size(pool) } } diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index 27620221cf23c..2c3550b747811 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -31,6 +31,7 @@ use arrow::array::{ }; use arrow::compute; use arrow::datatypes::{DataType, IntervalUnit, TimeUnit}; +use arrow_buffer::MemoryPool; use datafusion_common::{ DataFusionError, Result, ScalarValue, downcast_value, internal_err, }; @@ -459,7 +460,7 @@ impl Accumulator for MaxAccumulator { Ok(self.max.clone()) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) - size_of_val(&self.max) + self.max.size() } } @@ -501,7 +502,7 @@ impl Accumulator for MinAccumulator { Ok(self.min.clone()) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) - size_of_val(&self.min) + self.min.size() } } diff --git a/datafusion/functions-aggregate-common/src/noop_accumulator.rs b/datafusion/functions-aggregate-common/src/noop_accumulator.rs index e34d58770a69d..35060cad19369 100644 --- a/datafusion/functions-aggregate-common/src/noop_accumulator.rs +++ b/datafusion/functions-aggregate-common/src/noop_accumulator.rs @@ -16,6 +16,7 @@ // under the License. use arrow::array::ArrayRef; +use arrow_buffer::MemoryPool; use datafusion_common::{Result, ScalarValue}; use datafusion_expr_common::accumulator::Accumulator; @@ -54,7 +55,7 @@ impl Accumulator for NoopAccumulator { Ok(self.evaluate_value.clone()) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } diff --git a/datafusion/functions-aggregate-common/src/utils.rs b/datafusion/functions-aggregate-common/src/utils.rs index 78b8c52490c76..38522442b4cab 100644 --- a/datafusion/functions-aggregate-common/src/utils.rs +++ b/datafusion/functions-aggregate-common/src/utils.rs @@ -23,6 +23,7 @@ use arrow::compute::SortOptions; use arrow::datatypes::{ ArrowNativeType, DataType, DecimalType, Field, FieldRef, ToByteSlice, }; +use arrow_buffer::MemoryPool; use datafusion_common::cast::{as_list_array, as_primitive_array}; use datafusion_common::utils::SingleRowListArrayBuilder; use datafusion_common::utils::memory::estimate_memory_size; @@ -258,7 +259,7 @@ impl GenericDistinctBuffer { } /// Mirrors [`Accumulator::size`]. - pub fn size(&self) -> usize { + pub fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { let num_elements = self.values.len(); let fixed_size = size_of_val(self) + size_of_val(&self.values); estimate_memory_size::(num_elements, fixed_size).unwrap() diff --git a/datafusion/functions-aggregate/Cargo.toml b/datafusion/functions-aggregate/Cargo.toml index 8f8697fef0a1f..fdd7687c9dec9 100644 --- a/datafusion/functions-aggregate/Cargo.toml +++ b/datafusion/functions-aggregate/Cargo.toml @@ -43,6 +43,7 @@ name = "datafusion_functions_aggregate" [dependencies] ahash = { workspace = true } arrow = { workspace = true } +arrow-buffer = { workspace = true, features = ["pool"] } datafusion-common = { workspace = true } datafusion-doc = { workspace = true } datafusion-execution = { workspace = true } diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 3c9692d29c30f..3a53a48defeb4 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -30,6 +30,7 @@ use arrow::datatypes::{ UInt64Type, }; use arrow::{array::ArrayRef, datatypes::DataType, datatypes::Field}; +use arrow_buffer::MemoryPool; use datafusion_common::ScalarValue; use datafusion_common::{ DataFusionError, Result, downcast_value, internal_datafusion_err, internal_err, @@ -198,7 +199,7 @@ macro_rules! default_accumulator_impl { Ok(ScalarValue::UInt64(Some(self.hll.count() as u64))) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { // HLL has static size std::mem::size_of_val(self) } diff --git a/datafusion/functions-aggregate/src/approx_percentile_cont.rs b/datafusion/functions-aggregate/src/approx_percentile_cont.rs index b1e649ec029ff..e7dd1158fb966 100644 --- a/datafusion/functions-aggregate/src/approx_percentile_cont.rs +++ b/datafusion/functions-aggregate/src/approx_percentile_cont.rs @@ -30,6 +30,7 @@ use arrow::{ }, datatypes::{DataType, Field}, }; +use arrow_buffer::MemoryPool; use datafusion_common::{ DataFusionError, Result, ScalarValue, downcast_value, internal_err, not_impl_err, plan_err, @@ -479,7 +480,7 @@ impl Accumulator for ApproxPercentileAccumulator { Ok(()) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) + self.digest.size() - size_of_val(&self.digest) + self.return_type.size() - size_of_val(&self.return_type) diff --git a/datafusion/functions-aggregate/src/approx_percentile_cont_with_weight.rs b/datafusion/functions-aggregate/src/approx_percentile_cont_with_weight.rs index ff7762e816ad6..7a6ee7f7b00a8 100644 --- a/datafusion/functions-aggregate/src/approx_percentile_cont_with_weight.rs +++ b/datafusion/functions-aggregate/src/approx_percentile_cont_with_weight.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use arrow::compute::{and, filter, is_not_null}; use arrow::datatypes::FieldRef; use arrow::{array::ArrayRef, datatypes::DataType}; +use arrow_buffer::MemoryPool; use datafusion_common::ScalarValue; use datafusion_common::{Result, not_impl_err, plan_err}; use datafusion_expr::Volatility::Immutable; @@ -341,8 +342,8 @@ impl Accumulator for ApproxPercentileWithWeightAccumulator { Ok(()) } - fn size(&self) -> usize { + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) - size_of_val(&self.approx_percentile_cont_accumulator) - + self.approx_percentile_cont_accumulator.size() + + self.approx_percentile_cont_accumulator.size(pool) } } diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 8491062124954..e6bbe6507cf70 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -27,6 +27,7 @@ use arrow::array::{ }; use arrow::compute::{SortOptions, filter}; use arrow::datatypes::{DataType, Field, FieldRef, Fields}; +use arrow_buffer::MemoryPool; use datafusion_common::cast::as_list_array; use datafusion_common::utils::{ @@ -44,6 +45,8 @@ use datafusion_functions_aggregate_common::utils::ordering_fields; use datafusion_macros::user_doc; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; +use crate::utils::claim_buffers_recursive; + make_udaf_expr_and_func!( ArrayAgg, array_agg, @@ -390,27 +393,25 @@ impl Accumulator for ArrayAggAccumulator { Ok(SingleRowListArrayBuilder::new(concated_array).build_list_scalar()) } - fn size(&self) -> usize { - size_of_val(self) + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { + let mut total = size_of_val(self) + (size_of::() * self.values.capacity()) - + self + + self.datatype.size() + - size_of_val(&self.datatype); + + if let Some(pool) = pool { + for arr in &self.values { + claim_buffers_recursive(&arr.to_data(), pool); + } + } else { + total += self .values .iter() - // Each ArrayRef might be just a reference to a bigger array, and many - // ArrayRefs here might be referencing exactly the same array, so if we - // were to call `arr.get_array_memory_size()`, we would be double-counting - // the same underlying data many times. - // - // Instead, we do an approximation by estimating how much memory each - // ArrayRef would occupy if its underlying data was fully owned by this - // accumulator. - // - // Note that this is just an estimation, but the reality is that this - // accumulator might not own any data. .map(|arr| arr.to_data().get_slice_memory_size().unwrap_or_default()) - .sum::() - + self.datatype.size() - - size_of_val(&self.datatype) + .sum::(); + } + + total } } @@ -458,8 +459,7 @@ impl Accumulator for DistinctArrayAggAccumulator { if nulls.is_none_or(|nulls| nulls.null_count() < val.len()) { for i in 0..val.len() { if nulls.is_none_or(|nulls| nulls.is_valid(i)) { - self.values - .insert(ScalarValue::try_from_array(val, i)?.compacted()); + self.values.insert(ScalarValue::try_from_array(val, i)?); } } } @@ -518,13 +518,29 @@ impl Accumulator for DistinctArrayAggAccumulator { Ok(ScalarValue::List(arr)) } - fn size(&self) -> usize { - size_of_val(self) + ScalarValue::size_of_hashset(&self.values) - - size_of_val(&self.values) + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { + let mut total = size_of_val(self) + + size_of_val(&self.values) + + (size_of::() * self.values.capacity()) + self.datatype.size() - size_of_val(&self.datatype) - size_of_val(&self.sort_options) - + size_of::>() + + size_of::>(); + + for scalar in &self.values { + if let Some(array) = scalar.get_array_ref() { + total += size_of::>(); + if let Some(pool) = pool { + claim_buffers_recursive(&array.to_data(), pool); + } else { + total += scalar.size() - size_of_val(scalar); + } + } else { + total += scalar.size() - size_of_val(scalar); + } + } + + total } } @@ -643,14 +659,8 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { if nulls.is_none_or(|nulls| nulls.null_count() < val.len()) { for i in 0..val.len() { if nulls.is_none_or(|nulls| nulls.is_valid(i)) { - self.values - .push(ScalarValue::try_from_array(val, i)?.compacted()); - self.ordering_values.push( - get_row_at_idx(ord, i)? - .into_iter() - .map(|v| v.compacted()) - .collect(), - ) + self.values.push(ScalarValue::try_from_array(val, i)?); + self.ordering_values.push(get_row_at_idx(ord, i)?) } } } @@ -773,25 +783,48 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { Ok(ScalarValue::List(array)) } - fn size(&self) -> usize { - let mut total = size_of_val(self) + ScalarValue::size_of_vec(&self.values) - - size_of_val(&self.values); + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { + let mut total = size_of_val(self) + + size_of_val(&self.values) + + (size_of::() * self.values.capacity()); + + for scalar in &self.values { + if let Some(array) = scalar.get_array_ref() { + total += size_of::>(); + if let Some(pool) = pool { + claim_buffers_recursive(&array.to_data(), pool); + } else { + total += scalar.size() - size_of_val(scalar); + } + } else { + total += scalar.size() - size_of_val(scalar); + } + } - // Add size of the `self.ordering_values` total += size_of::>() * self.ordering_values.capacity(); for row in &self.ordering_values { - total += ScalarValue::size_of_vec(row) - size_of_val(row); + total += size_of_val(row) + (size_of::() * row.capacity()); + for scalar in row { + if let Some(array) = scalar.get_array_ref() { + total += size_of::>(); + if let Some(pool) = pool { + claim_buffers_recursive(&array.to_data(), pool); + } else { + total += + array.to_data().get_slice_memory_size().unwrap_or_default(); + } + } else { + total += scalar.size() - size_of_val(scalar); + } + } } - // Add size of the `self.datatypes` total += size_of::() * self.datatypes.capacity(); for dtype in &self.datatypes { total += dtype.size() - size_of_val(dtype); } - // Add size of the `self.ordering_req` total += size_of::() * self.ordering_req.capacity(); - // TODO: Calculate size of each `PhysicalSortExpr` more accurately. total } } @@ -801,6 +834,7 @@ mod tests { use super::*; use arrow::array::{ListBuilder, StringBuilder}; use arrow::datatypes::{FieldRef, Schema}; + use arrow_buffer::TrackingMemoryPool; use datafusion_common::cast::as_generic_string_array; use datafusion_common::internal_err; use datafusion_physical_expr::PhysicalExpr; @@ -1075,10 +1109,21 @@ mod tests { acc2.update_batch(&[data(["b", "c", "a"])])?; acc1 = merge(acc1, acc2)?; - assert_eq!(acc1.size(), 266); + // Calculate size without using the arrow memory pool (it will default to use get_slice_memory_size) + let size_without_pool = acc1.size(None); + assert_eq!(size_without_pool, 266); + + // Calculate size using the arrow memory pool. + // pool.used() returns the full physical buffer capacity (offsets, nulls, data, capacity overhead), + // while get_slice_memory_size() only counts logical data size. + let pool = TrackingMemoryPool::default(); + let fixed_size = acc1.size(Some(&pool)); + let size_with_pool = pool.used() + fixed_size; + assert_eq!(size_with_pool, 2316); Ok(()) } + #[test] fn does_not_over_account_memory_distinct() -> Result<()> { let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() @@ -1092,8 +1137,10 @@ mod tests { acc2.update_batch(&[string_list_data([vec!["e", "f", "g"]])])?; acc1 = merge(acc1, acc2)?; - // without compaction, the size is 16660 - assert_eq!(acc1.size(), 1660); + let pool = TrackingMemoryPool::default(); + let total_size = acc1.size(Some(&pool)); + // if we don't use the memory pool: size(None); we get 16576 + assert_eq!(total_size, 484); Ok(()) } @@ -1110,8 +1157,10 @@ mod tests { vec!["b", "c", "d"], ])])?; - // without compaction, the size is 17112 - assert_eq!(acc.size(), 2184); + let pool = TrackingMemoryPool::default(); + let total_size = acc.size(Some(&pool)); + // if we don't use the memory pool: size(None); we get 17148 + assert_eq!(total_size, 1056); Ok(()) } diff --git a/datafusion/functions-aggregate/src/average.rs b/datafusion/functions-aggregate/src/average.rs index 46a8dbf9540b6..e86f3954a2b80 100644 --- a/datafusion/functions-aggregate/src/average.rs +++ b/datafusion/functions-aggregate/src/average.rs @@ -31,6 +31,7 @@ use arrow::datatypes::{ DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType, DurationSecondType, Field, FieldRef, Float64Type, TimeUnit, UInt64Type, i256, }; +use arrow_buffer::MemoryPool; use datafusion_common::types::{NativeType, logical_float64}; use datafusion_common::{Result, ScalarValue, exec_err, not_impl_err}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; @@ -529,7 +530,7 @@ impl Accumulator for AvgAccumulator { )) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } @@ -607,7 +608,7 @@ impl Accumulator for DecimalAvgAccumu ) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } @@ -685,7 +686,7 @@ impl Accumulator for DurationAvgAccumulator { } } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } @@ -948,7 +949,7 @@ where true } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { self.counts.capacity() * size_of::() + self.sums.capacity() * size_of::() } } diff --git a/datafusion/functions-aggregate/src/bit_and_or_xor.rs b/datafusion/functions-aggregate/src/bit_and_or_xor.rs index 734a916e2a870..658e760bfab68 100644 --- a/datafusion/functions-aggregate/src/bit_and_or_xor.rs +++ b/datafusion/functions-aggregate/src/bit_and_or_xor.rs @@ -30,6 +30,7 @@ use arrow::datatypes::{ Int32Type, Int64Type, UInt8Type, UInt16Type, UInt32Type, UInt64Type, }; +use arrow_buffer::MemoryPool; use datafusion_common::cast::as_list_array; use datafusion_common::{Result, ScalarValue, not_impl_err}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; @@ -356,7 +357,7 @@ where ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } @@ -401,7 +402,7 @@ where ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } @@ -455,7 +456,7 @@ where ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } @@ -518,7 +519,7 @@ where ScalarValue::new_primitive::(v, &T::DATA_TYPE) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) + self.values.capacity() * size_of::() } diff --git a/datafusion/functions-aggregate/src/bool_and_or.rs b/datafusion/functions-aggregate/src/bool_and_or.rs index a107024e2fb4f..de41af48df4a4 100644 --- a/datafusion/functions-aggregate/src/bool_and_or.rs +++ b/datafusion/functions-aggregate/src/bool_and_or.rs @@ -27,6 +27,7 @@ use arrow::compute::bool_or as compute_bool_or; use arrow::datatypes::Field; use arrow::datatypes::{DataType, FieldRef}; +use arrow_buffer::MemoryPool; use datafusion_common::internal_err; use datafusion_common::{Result, ScalarValue}; use datafusion_common::{downcast_value, not_impl_err}; @@ -215,7 +216,7 @@ impl Accumulator for BoolAndAccumulator { Ok(ScalarValue::Boolean(self.acc)) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } @@ -353,7 +354,7 @@ impl Accumulator for BoolOrAccumulator { Ok(ScalarValue::Boolean(self.acc)) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } diff --git a/datafusion/functions-aggregate/src/correlation.rs b/datafusion/functions-aggregate/src/correlation.rs index 538311dfa2637..f3fccf3f47e88 100644 --- a/datafusion/functions-aggregate/src/correlation.rs +++ b/datafusion/functions-aggregate/src/correlation.rs @@ -32,6 +32,7 @@ use arrow::{ array::ArrayRef, datatypes::{DataType, Field}, }; +use arrow_buffer::MemoryPool; use datafusion_expr::{EmitTo, GroupsAccumulator}; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::accumulate_multiple; use log::debug; @@ -228,12 +229,12 @@ impl Accumulator for CorrelationAccumulator { Ok(ScalarValue::Float64(None)) } - fn size(&self) -> usize { - size_of_val(self) - size_of_val(&self.covar) + self.covar.size() + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { + size_of_val(self) - size_of_val(&self.covar) + self.covar.size(pool) - size_of_val(&self.stddev1) - + self.stddev1.size() + + self.stddev1.size(pool) - size_of_val(&self.stddev2) - + self.stddev2.size() + + self.stddev2.size(pool) } fn state(&mut self) -> Result> { @@ -536,7 +537,7 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator { Ok(()) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(&self.count) + size_of_val(&self.sum_x) + size_of_val(&self.sum_y) diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index a7c819acafea8..020c05711fca4 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::utils::claim_buffers_recursive; use ahash::RandomState; use arrow::{ array::{Array, ArrayRef, AsArray, BooleanArray, Int64Array, PrimitiveArray}, @@ -29,6 +30,7 @@ use arrow::{ UInt8Type, UInt16Type, UInt32Type, UInt64Type, }, }; +use arrow_buffer::MemoryPool; use datafusion_common::{ HashMap, Result, ScalarValue, downcast_value, internal_err, not_impl_err, stats::Precision, utils::expr::COUNT_STAR_EXPANSION, @@ -500,7 +502,7 @@ impl Accumulator for SlidingDistinctCountAccumulator { true } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } } @@ -551,7 +553,7 @@ impl Accumulator for CountAccumulator { true } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } } @@ -714,7 +716,7 @@ impl GroupsAccumulator for CountGroupsAccumulator { true } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { self.counts.capacity() * size_of::() } } @@ -772,15 +774,25 @@ impl DistinctCountAccumulator { // calculates the size as accurately as possible. Note that calling this // method is expensive - fn full_size(&self) -> usize { - size_of_val(self) + fn full_size(&self, pool: Option<&dyn MemoryPool>) -> usize { + let mut total = size_of_val(self) + (size_of::() * self.values.capacity()) - + self - .values - .iter() - .map(|vals| ScalarValue::size(vals) - size_of_val(vals)) - .sum::() - + size_of::() + + size_of::(); + + for scalar in &self.values { + if let Some(array) = scalar.get_array_ref() { + total += size_of::>(); + if let Some(pool) = pool { + claim_buffers_recursive(&array.to_data(), pool); + } else { + total += scalar.size() - size_of_val(scalar); + } + } else { + total += scalar.size() - size_of_val(scalar); + } + } + + total } } @@ -839,11 +851,11 @@ impl Accumulator for DistinctCountAccumulator { Ok(ScalarValue::Int64(Some(self.values.len() as i64))) } - fn size(&self) -> usize { + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { match &self.state_data_type { DataType::Boolean | DataType::Null => self.fixed_size(), d if d.is_primitive() => self.fixed_size(), - _ => self.full_size(), + _ => self.full_size(pool), } } } @@ -1045,4 +1057,51 @@ mod tests { assert_eq!(merged.evaluate()?, ScalarValue::Int64(Some(3))); Ok(()) } + + #[test] + fn distinct_count_does_not_over_account_memory() -> Result<()> { + use arrow::array::ListArray; + use arrow_buffer::TrackingMemoryPool; + + // Create a DistinctCountAccumulator for List (array type) + let mut acc = DistinctCountAccumulator { + values: HashSet::default(), + state_data_type: DataType::List(Arc::new(Field::new_list_field( + DataType::Int32, + true, + ))), + }; + + // Create list arrays with shared buffers (slices of the same underlying data) + let list_array = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2), Some(3)]), + Some(vec![Some(4), Some(5), Some(6)]), + Some(vec![Some(1), Some(2), Some(3)]), // duplicate + Some(vec![Some(7), Some(8), Some(9)]), + Some(vec![Some(4), Some(5), Some(6)]), // duplicate + ]); + + acc.update_batch(&[Arc::new(list_array)])?; + + // Should have 3 distinct arrays + assert_eq!(acc.values.len(), 3); + + // Test with memory pool - should not over-account shared buffers + let pool = TrackingMemoryPool::default(); + let structural_size = acc.size(Some(&pool)); + let total_size_with_pool = structural_size + pool.used(); + + // Test without pool - uses scalar.size() which may over-account + let size_without_pool = acc.size(None); + + // With pool should be much smaller than without pool due to deduplication + // The pool tracks actual physical buffers, avoiding double-counting + // With the pool we get 13544 while when using the pool we get 4728 + assert!( + total_size_with_pool < size_without_pool, + "Pool-based size ({total_size_with_pool}) should be less than non-pool size ({size_without_pool}) due to buffer deduplication" + ); + + Ok(()) + } } diff --git a/datafusion/functions-aggregate/src/covariance.rs b/datafusion/functions-aggregate/src/covariance.rs index e86d742db3d45..35bec4b4f4c77 100644 --- a/datafusion/functions-aggregate/src/covariance.rs +++ b/datafusion/functions-aggregate/src/covariance.rs @@ -23,6 +23,7 @@ use arrow::{ compute::kernels::cast, datatypes::{DataType, Field}, }; +use arrow_buffer::MemoryPool; use datafusion_common::{ Result, ScalarValue, downcast_value, plan_err, unwrap_or_internal_err, }; @@ -434,7 +435,7 @@ impl Accumulator for CovarianceAccumulator { } } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } } diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 5f3490f535a46..08c6d7e78acdb 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -37,6 +37,7 @@ use arrow::datatypes::{ TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type, }; +use arrow_buffer::MemoryPool; use datafusion_common::cast::as_boolean_array; use datafusion_common::utils::{compare_rows, extract_row_at_idx_to_buf, get_row_at_idx}; use datafusion_common::{ @@ -53,6 +54,8 @@ use datafusion_functions_aggregate_common::utils::get_sort_options; use datafusion_macros::user_doc; use datafusion_physical_expr_common::sort_expr::LexOrdering; +use crate::utils::claim_buffers_recursive; + create_func!(FirstValue, first_value_udaf); create_func!(LastValue, last_value_udaf); @@ -743,7 +746,7 @@ where Ok(()) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { self.vals.capacity() * size_of::() + self.null_builder.capacity() / 8 // capacity is in bits, so convert to bytes + self.is_sets.capacity() / 8 @@ -854,8 +857,21 @@ impl Accumulator for TrivialFirstValueAccumulator { Ok(self.first.clone()) } - fn size(&self) -> usize { - size_of_val(self) - size_of_val(&self.first) + self.first.size() + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { + let mut total = size_of_val(self) - size_of_val(&self.first); + if let Some(array) = self.first.get_array_ref() { + total += size_of::>(); + if let Some(pool) = pool { + // With pool: claim buffers for accurate accounting with deduplication + claim_buffers_recursive(&array.to_data(), pool); + } else { + // Without pool: count buffer size (potential double accounting) + total += self.first.size() - size_of_val(&self.first); + } + } else { + total += self.first.size(); + } + total } } @@ -1018,11 +1034,36 @@ impl Accumulator for FirstValueAccumulator { Ok(self.first.clone()) } - fn size(&self) -> usize { - size_of_val(self) - size_of_val(&self.first) - + self.first.size() - + ScalarValue::size_of_vec(&self.orderings) - - size_of_val(&self.orderings) + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { + let mut total = + size_of_val(self) - size_of_val(&self.first) - size_of_val(&self.orderings) + + size_of::() * self.orderings.capacity(); + + if let Some(array) = self.first.get_array_ref() { + total += size_of::>(); + if let Some(pool) = pool { + claim_buffers_recursive(&array.to_data(), pool); + } else { + total += self.first.size() - size_of_val(&self.first); + } + } else { + total += self.first.size(); + } + + for scalar in &self.orderings { + if let Some(array) = scalar.get_array_ref() { + total += size_of::>(); + if let Some(pool) = pool { + claim_buffers_recursive(&array.to_data(), pool); + } else { + total += scalar.size() - size_of_val(scalar); + } + } else { + total += scalar.size() - size_of_val(scalar); + } + } + + total } } @@ -1356,8 +1397,19 @@ impl Accumulator for TrivialLastValueAccumulator { Ok(self.last.clone()) } - fn size(&self) -> usize { - size_of_val(self) - size_of_val(&self.last) + self.last.size() + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { + let mut total = size_of_val(self) - size_of_val(&self.last); + if let Some(array) = self.last.get_array_ref() { + total += size_of::>(); + if let Some(pool) = pool { + claim_buffers_recursive(&array.to_data(), pool); + } else { + total += self.last.size() - size_of_val(&self.last); + } + } else { + total += self.last.size(); + } + total } } @@ -1524,11 +1576,36 @@ impl Accumulator for LastValueAccumulator { Ok(self.last.clone()) } - fn size(&self) -> usize { - size_of_val(self) - size_of_val(&self.last) - + self.last.size() - + ScalarValue::size_of_vec(&self.orderings) - - size_of_val(&self.orderings) + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { + let mut total = + size_of_val(self) - size_of_val(&self.last) - size_of_val(&self.orderings) + + size_of::() * self.orderings.capacity(); + + if let Some(array) = self.last.get_array_ref() { + total += size_of::>(); + if let Some(pool) = pool { + claim_buffers_recursive(&array.to_data(), pool); + } else { + total += self.last.size() - size_of_val(&self.last); + } + } else { + total += self.last.size(); + } + + for scalar in &self.orderings { + if let Some(array) = scalar.get_array_ref() { + total += size_of::>(); + if let Some(pool) = pool { + claim_buffers_recursive(&array.to_data(), pool); + } else { + total += scalar.size() - size_of_val(scalar); + } + } else { + total += scalar.size() - size_of_val(scalar); + } + } + + total } } @@ -1567,14 +1644,14 @@ fn convert_to_sort_cols(arrs: &[ArrayRef], sort_exprs: &LexOrdering) -> Vec( @@ -1968,7 +2047,9 @@ mod tests { last_accumulator.update_batch(values)?; - Ok(last_accumulator.size()) + let pool = TrackingMemoryPool::default(); + let structural_size = last_accumulator.size(Some(&pool)); + Ok(structural_size + pool.used()) } let batch1 = ListArray::from_iter_primitive::( diff --git a/datafusion/functions-aggregate/src/median.rs b/datafusion/functions-aggregate/src/median.rs index 29b8857254dd3..c3c99dff82bad 100644 --- a/datafusion/functions-aggregate/src/median.rs +++ b/datafusion/functions-aggregate/src/median.rs @@ -39,6 +39,7 @@ use arrow::datatypes::{ ArrowNativeType, ArrowPrimitiveType, Decimal32Type, Decimal64Type, FieldRef, }; +use arrow_buffer::MemoryPool; use datafusion_common::{ DataFusionError, Result, ScalarValue, assert_eq_or_internal_err, internal_datafusion_err, @@ -294,7 +295,7 @@ impl Accumulator for MedianAccumulator { ScalarValue::new_primitive::(median, &self.data_type) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) + self.all_values.capacity() * size_of::() } @@ -536,7 +537,7 @@ impl GroupsAccumulator for MedianGroupsAccumulator usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { self.group_values .iter() .map(|values| values.capacity() * size_of::()) @@ -574,8 +575,8 @@ impl Accumulator for DistinctMedianAccumulator { ScalarValue::new_primitive::(median, &self.data_type) } - fn size(&self) -> usize { - size_of_val(self) + self.distinct_values.size() + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { + size_of_val(self) + self.distinct_values.size(pool) } } diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index 0eebad9e3a5c3..5d4b36a0c5f24 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -28,6 +28,7 @@ use arrow::datatypes::{ DurationSecondType, Float16Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type, UInt8Type, UInt16Type, UInt32Type, UInt64Type, }; +use arrow_buffer::MemoryPool; use datafusion_common::stats::Precision; use datafusion_common::{ColumnStatistics, Result, exec_err, internal_err}; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; @@ -437,7 +438,7 @@ impl Accumulator for SlidingMaxAccumulator { true } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) - size_of_val(&self.max) + self.max.size() } } @@ -729,7 +730,7 @@ impl Accumulator for SlidingMinAccumulator { true } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) - size_of_val(&self.min) + self.min.size() } } diff --git a/datafusion/functions-aggregate/src/min_max/min_max_bytes.rs b/datafusion/functions-aggregate/src/min_max/min_max_bytes.rs index e4ac7eccf5692..d1e956014f9fd 100644 --- a/datafusion/functions-aggregate/src/min_max/min_max_bytes.rs +++ b/datafusion/functions-aggregate/src/min_max/min_max_bytes.rs @@ -20,6 +20,7 @@ use arrow::array::{ LargeBinaryBuilder, LargeStringBuilder, StringBuilder, StringViewBuilder, }; use arrow::datatypes::DataType; +use arrow_buffer::MemoryPool; use datafusion_common::hash_map::Entry; use datafusion_common::{HashMap, Result, internal_err}; use datafusion_expr::{EmitTo, GroupsAccumulator}; @@ -329,8 +330,8 @@ impl GroupsAccumulator for MinMaxBytesAccumulator { true } - fn size(&self) -> usize { - self.inner.size() + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { + self.inner.size(pool) } } @@ -504,7 +505,7 @@ impl MinMaxBytesState { } } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { self.total_data_bytes + self.min_max.len() * size_of::>>() } } diff --git a/datafusion/functions-aggregate/src/min_max/min_max_struct.rs b/datafusion/functions-aggregate/src/min_max/min_max_struct.rs index 796fd586ca5c8..53d4ec81c6cb5 100644 --- a/datafusion/functions-aggregate/src/min_max/min_max_struct.rs +++ b/datafusion/functions-aggregate/src/min_max/min_max_struct.rs @@ -23,6 +23,7 @@ use arrow::{ }, datatypes::DataType, }; +use arrow_buffer::MemoryPool; use datafusion_common::{ Result, internal_err, scalar::{copy_array_data, partial_cmp_struct}, @@ -154,8 +155,8 @@ impl GroupsAccumulator for MinMaxStructAccumulator { true } - fn size(&self) -> usize { - self.inner.size() + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { + self.inner.size(pool) } } @@ -293,7 +294,7 @@ impl MinMaxStructState { } } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { self.total_data_bytes + self.min_max.len() * size_of::>() } } diff --git a/datafusion/functions-aggregate/src/nth_value.rs b/datafusion/functions-aggregate/src/nth_value.rs index bc343a1969c09..cc8ebd1606edc 100644 --- a/datafusion/functions-aggregate/src/nth_value.rs +++ b/datafusion/functions-aggregate/src/nth_value.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use arrow::array::{ArrayRef, AsArray, StructArray, new_empty_array}; use arrow::datatypes::{DataType, Field, FieldRef, Fields}; +use arrow_buffer::MemoryPool; use datafusion_common::utils::{SingleRowListArrayBuilder, get_row_at_idx}; use datafusion_common::{ Result, ScalarValue, assert_or_internal_err, exec_err, not_impl_err, @@ -308,7 +309,7 @@ impl Accumulator for TrivialNthValueAccumulator { } } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) + ScalarValue::size_of_vec_deque(&self.values) - size_of_val(&self.values) + size_of::() @@ -522,7 +523,7 @@ impl Accumulator for NthValueAccumulator { } } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { let mut total = size_of_val(self) + ScalarValue::size_of_vec_deque(&self.values) - size_of_val(&self.values); diff --git a/datafusion/functions-aggregate/src/percentile_cont.rs b/datafusion/functions-aggregate/src/percentile_cont.rs index d6c8eabb459e6..61a297fe15a72 100644 --- a/datafusion/functions-aggregate/src/percentile_cont.rs +++ b/datafusion/functions-aggregate/src/percentile_cont.rs @@ -32,6 +32,7 @@ use arrow::{ }; use arrow::array::ArrowNativeTypeOp; +use arrow_buffer::MemoryPool; use crate::min_max::{max_udaf, min_udaf}; use datafusion_common::{ @@ -538,7 +539,7 @@ impl Accumulator for PercentileContAccumulator { ScalarValue::new_primitive::(value, &self.data_type) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) + self.all_values.capacity() * size_of::() } } @@ -737,7 +738,7 @@ impl GroupsAccumulator true } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { self.group_values .iter() .map(|values| values.capacity() * size_of::()) @@ -776,8 +777,8 @@ impl Accumulator for DistinctPercentileContAccumula ScalarValue::new_primitive::(value, &self.data_type) } - fn size(&self) -> usize { - size_of_val(self) + self.distinct_values.size() + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { + size_of_val(self) + self.distinct_values.size(pool) } } diff --git a/datafusion/functions-aggregate/src/regr.rs b/datafusion/functions-aggregate/src/regr.rs index bbc5567dab9d6..eb632057da834 100644 --- a/datafusion/functions-aggregate/src/regr.rs +++ b/datafusion/functions-aggregate/src/regr.rs @@ -25,6 +25,7 @@ use arrow::{ datatypes::DataType, datatypes::Field, }; +use arrow_buffer::MemoryPool; use datafusion_common::{ HashMap, Result, ScalarValue, downcast_value, plan_err, unwrap_or_internal_err, }; @@ -806,7 +807,7 @@ impl Accumulator for RegrAccumulator { } } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } } diff --git a/datafusion/functions-aggregate/src/stddev.rs b/datafusion/functions-aggregate/src/stddev.rs index 13eb5e1660b52..c190fda94db10 100644 --- a/datafusion/functions-aggregate/src/stddev.rs +++ b/datafusion/functions-aggregate/src/stddev.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use arrow::array::Float64Array; use arrow::datatypes::FieldRef; use arrow::{array::ArrayRef, datatypes::DataType, datatypes::Field}; +use arrow_buffer::MemoryPool; use datafusion_common::{Result, internal_err, not_impl_err}; use datafusion_common::{ScalarValue, plan_err}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; @@ -329,8 +330,8 @@ impl Accumulator for StddevAccumulator { } } - fn size(&self) -> usize { - align_of_val(self) - align_of_val(&self.variance) + self.variance.size() + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { + align_of_val(self) - align_of_val(&self.variance) + self.variance.size(pool) } fn supports_retract_batch(&self) -> bool { @@ -384,8 +385,8 @@ impl GroupsAccumulator for StddevGroupsAccumulator { self.variance.state(emit_to) } - fn size(&self) -> usize { - self.variance.size() + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { + self.variance.size(pool) } } diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index 77e9f60afd3cf..06aad03e4555e 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -25,6 +25,7 @@ use crate::array_agg::ArrayAgg; use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Field, FieldRef}; +use arrow_buffer::MemoryPool; use datafusion_common::cast::{ as_generic_string_array, as_string_array, as_string_view_array, }; @@ -290,9 +291,9 @@ impl Accumulator for StringAggAccumulator { ))) } - fn size(&self) -> usize { + fn size(&self, pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) - size_of_val(&self.array_agg_acc) - + self.array_agg_acc.size() + + self.array_agg_acc.size(pool) + self.delimiter.capacity() } @@ -394,7 +395,7 @@ impl Accumulator for SimpleStringAggAccumulator { Ok(result) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) + self.delimiter.capacity() + self.accumulated_string.capacity() } diff --git a/datafusion/functions-aggregate/src/sum.rs b/datafusion/functions-aggregate/src/sum.rs index 198ba54adfa2a..abfcf52d3b287 100644 --- a/datafusion/functions-aggregate/src/sum.rs +++ b/datafusion/functions-aggregate/src/sum.rs @@ -27,6 +27,7 @@ use arrow::datatypes::{ DurationMillisecondType, DurationNanosecondType, DurationSecondType, FieldRef, Float64Type, Int64Type, TimeUnit, UInt64Type, }; +use arrow_buffer::MemoryPool; use datafusion_common::types::{ NativeType, logical_float64, logical_int8, logical_int16, logical_int32, logical_int64, logical_uint8, logical_uint16, logical_uint32, logical_uint64, @@ -391,7 +392,7 @@ impl Accumulator for SumAccumulator { ScalarValue::new_primitive::(self.sum, &self.data_type) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } } @@ -451,7 +452,7 @@ impl Accumulator for SlidingSumAccumulator { ScalarValue::new_primitive::(v, &self.data_type) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } @@ -515,7 +516,7 @@ impl Accumulator for SlidingDistinctSumAccumulator { Ok(ScalarValue::Int64(Some(self.sum))) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } diff --git a/datafusion/functions-aggregate/src/utils.rs b/datafusion/functions-aggregate/src/utils.rs index 5e1925fcdbb5d..37939d0640cdc 100644 --- a/datafusion/functions-aggregate/src/utils.rs +++ b/datafusion/functions-aggregate/src/utils.rs @@ -23,6 +23,19 @@ use datafusion_common::{DataFusionError, Result, ScalarValue, internal_err, plan use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +/// Recursively claims all buffers in an ArrayData and its children with the memory pool. +pub(crate) fn claim_buffers_recursive( + data: &arrow::array::ArrayData, + pool: &dyn arrow_buffer::MemoryPool, +) { + for buffer in data.buffers() { + buffer.claim(pool); + } + for child in data.child_data() { + claim_buffers_recursive(child, pool); + } +} + /// Evaluates a physical expression to extract its scalar value. /// /// This is used to extract constant values from expressions (like percentile parameters) diff --git a/datafusion/functions-aggregate/src/variance.rs b/datafusion/functions-aggregate/src/variance.rs index e6978c15d0bf7..71a586e112f33 100644 --- a/datafusion/functions-aggregate/src/variance.rs +++ b/datafusion/functions-aggregate/src/variance.rs @@ -25,6 +25,7 @@ use arrow::{ compute::kernels::cast, datatypes::{DataType, Field}, }; +use arrow_buffer::MemoryPool; use datafusion_common::{Result, ScalarValue, downcast_value, not_impl_err, plan_err}; use datafusion_expr::{ Accumulator, AggregateUDFImpl, Documentation, GroupsAccumulator, Signature, @@ -407,7 +408,7 @@ impl Accumulator for VarianceAccumulator { })) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } @@ -574,7 +575,7 @@ impl GroupsAccumulator for VarianceGroupsAccumulator { ]) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { self.m2s.capacity() * size_of::() + self.means.capacity() * size_of::() + self.counts.capacity() * size_of::() diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 68e67fa018f08..648a37510531c 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -49,6 +49,7 @@ name = "datafusion_physical_plan" [dependencies] ahash = { workspace = true } arrow = { workspace = true } +arrow-buffer = { workspace = true, features = ["pool"] } arrow-ord = { workspace = true } arrow-schema = { workspace = true } async-trait = { workspace = true } diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index a55d70ca6fb27..4d0e067652996 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -25,6 +25,7 @@ use crate::metrics::{BaselineMetrics, RecordOutput}; use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; +use arrow_buffer::TrackingMemoryPool; use datafusion_common::{Result, ScalarValue, internal_datafusion_err, internal_err}; use datafusion_execution::TaskContext; use datafusion_expr::Operator; @@ -71,6 +72,7 @@ struct AggregateStreamInner { // ==== Execution Resources ==== baseline_metrics: BaselineMetrics, + arrow_pool: TrackingMemoryPool, reservation: MemoryReservation, } @@ -321,6 +323,7 @@ impl AggregateStream { aggregate_expressions, filter_expressions, accumulators, + arrow_pool: TrackingMemoryPool::default(), reservation, finished: false, agg_dyn_filter_state: maybe_dynamic_filter, @@ -343,6 +346,7 @@ impl AggregateStream { &mut this.accumulators, &this.aggregate_expressions, &this.filter_expressions, + &this.arrow_pool, ) }; @@ -430,6 +434,7 @@ fn aggregate_batch( accumulators: &mut [AccumulatorItem], expressions: &[Vec>], filters: &[Option>], + arrow_pool: &TrackingMemoryPool, ) -> Result { let mut allocated = 0usize; @@ -454,7 +459,7 @@ fn aggregate_batch( let values = evaluate_expressions_to_arrays(expr, batch.as_ref())?; // 1.4 - let size_pre = accum.size(); + let size_pre = accum.size(Some(arrow_pool)); let res = match mode { AggregateMode::Partial | AggregateMode::Single @@ -463,7 +468,7 @@ fn aggregate_batch( accum.merge_batch(&values) } }; - let size_post = accum.size(); + let size_post = accum.size(Some(arrow_pool)); allocated += size_post.saturating_sub(size_pre); res })?; diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index cb22fbf9a06a1..bf1321b58e777 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -38,12 +38,12 @@ use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::array::*; use arrow::datatypes::SchemaRef; +use arrow_buffer::TrackingMemoryPool; use datafusion_common::{ DataFusionError, Result, assert_eq_or_internal_err, assert_or_internal_err, internal_err, }; use datafusion_execution::TaskContext; -use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_expr::{EmitTo, GroupsAccumulator}; use datafusion_physical_expr::aggregate::AggregateFunctionExpr; @@ -440,6 +440,11 @@ pub(crate) struct GroupedHashAggregateStream { // EXECUTION RESOURCES: // Fields related to managing execution resources and monitoring performance. // ======================================================================== + /// Arrow memory pool for accurate buffer tracking + /// This pool tracks all Arrow buffers owned by accumulators + /// to provide accurate memory accounting without double-counting shared buffers. + arrow_pool: TrackingMemoryPool, + /// The memory reservation for this grouping reservation: MemoryReservation, @@ -668,6 +673,7 @@ impl GroupedHashAggregateStream { aggregate_arguments, filter_expressions, group_by: agg_group_by, + arrow_pool: TrackingMemoryPool::default(), reservation, oom_mode, group_values, @@ -1056,12 +1062,16 @@ impl GroupedHashAggregateStream { } fn update_memory_reservation(&mut self) -> Result<()> { - let acc = self.accumulators.iter().map(|x| x.size()).sum::(); - let new_size = acc - + self.group_values.size() + let total_size = self.group_values.size() + self.group_ordering.size() - + self.current_group_indices.allocated_size(); - let reservation_result = self.reservation.try_resize(new_size); + + self.current_group_indices.capacity() * size_of::() + + self + .accumulators + .iter() + .map(|x| x.size(Some(&self.arrow_pool))) + .sum::(); + + let reservation_result = self.reservation.try_resize(total_size); if reservation_result.is_ok() { self.spill_state diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index b00bd0dcc6bfd..1d1386ce3975f 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -49,6 +49,7 @@ avro = ["datafusion-datasource-avro", "datafusion-common/avro"] [dependencies] arrow = { workspace = true } +arrow-buffer = { workspace = true, features = ["pool"] } chrono = { workspace = true } datafusion-catalog = { workspace = true } datafusion-catalog-listing = { workspace = true } diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 77676fc2fd2d9..f2831561dc973 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -24,6 +24,7 @@ use arrow::datatypes::{ TimeUnit, UnionFields, UnionMode, }; use arrow::util::pretty::pretty_format_batches; +use arrow_buffer::MemoryPool; use datafusion::datasource::file_format::json::{JsonFormat, JsonFormatFactory}; use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, @@ -2214,7 +2215,7 @@ fn roundtrip_aggregate_udf() { Ok(ScalarValue::Float64(None)) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } } @@ -2515,7 +2516,7 @@ fn roundtrip_window() { Ok(ScalarValue::Float64(None)) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index aa5458849330f..3c61a86be4c0c 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -30,6 +30,7 @@ use crate::cases::{ use arrow::array::RecordBatch; use arrow::csv::WriterBuilder; use arrow::datatypes::{Fields, TimeUnit}; +use arrow_buffer::MemoryPool; use datafusion::physical_expr::aggregate::AggregateExprBuilder; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::metrics::MetricType; @@ -711,7 +712,7 @@ fn roundtrip_aggregate_udaf() -> Result<()> { Ok(ScalarValue::Int64(Some(0))) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { 0 } } diff --git a/datafusion/spark/Cargo.toml b/datafusion/spark/Cargo.toml index 09959db41fe60..97a46932dcf18 100644 --- a/datafusion/spark/Cargo.toml +++ b/datafusion/spark/Cargo.toml @@ -40,6 +40,7 @@ name = "datafusion_spark" [dependencies] arrow = { workspace = true } +arrow-buffer = { workspace = true, features = ["pool"] } bigdecimal = { workspace = true } chrono = { workspace = true } crc32fast = "1.4" diff --git a/datafusion/spark/src/function/aggregate/avg.rs b/datafusion/spark/src/function/aggregate/avg.rs index bbcda9b0f8c7f..f382c803220b3 100644 --- a/datafusion/spark/src/function/aggregate/avg.rs +++ b/datafusion/spark/src/function/aggregate/avg.rs @@ -23,6 +23,7 @@ use arrow::array::{ }; use arrow::compute::sum; use arrow::datatypes::{DataType, Field, FieldRef}; +use arrow_buffer::MemoryPool; use datafusion_common::types::{NativeType, logical_float64}; use datafusion_common::{Result, ScalarValue, not_impl_err}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; @@ -199,7 +200,7 @@ impl Accumulator for AvgAccumulator { } } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } } @@ -347,7 +348,7 @@ where ]) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { self.counts.capacity() * size_of::() + self.sums.capacity() * size_of::() } } diff --git a/datafusion/spark/src/function/aggregate/try_sum.rs b/datafusion/spark/src/function/aggregate/try_sum.rs index 6509cea26b716..92ceae926a246 100644 --- a/datafusion/spark/src/function/aggregate/try_sum.rs +++ b/datafusion/spark/src/function/aggregate/try_sum.rs @@ -20,6 +20,7 @@ use arrow::datatypes::{ DECIMAL128_MAX_PRECISION, DataType, Decimal128Type, Field, FieldRef, Float64Type, Int64Type, }; +use arrow_buffer::MemoryPool; use datafusion_common::{Result, ScalarValue, downcast_value, exec_err, not_impl_err}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::utils::format_state_name; @@ -116,7 +117,7 @@ impl Accumulator for TrySumAccumulator { evaluate_internal(self) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } } diff --git a/datafusion/substrait/Cargo.toml b/datafusion/substrait/Cargo.toml index 8bfec86497ef0..c38a5747ac1da 100644 --- a/datafusion/substrait/Cargo.toml +++ b/datafusion/substrait/Cargo.toml @@ -34,6 +34,7 @@ rust-version = { workspace = true } workspace = true [dependencies] +arrow-buffer = { workspace = true, features = ["pool"] } async-recursion = "1.0" async-trait = { workspace = true } chrono = { workspace = true } diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index 98b35bf082ec4..a089db3c67ac4 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -16,6 +16,7 @@ // under the License. use crate::utils::test::read_json; +use arrow_buffer::MemoryPool; use datafusion::arrow::array::ArrayRef; use datafusion::functions_nested::map::map; use datafusion::logical_expr::LogicalPlanBuilder; @@ -1560,7 +1561,7 @@ async fn roundtrip_aggregate_udf() -> Result<()> { Ok(ScalarValue::Int64(None)) } - fn size(&self) -> usize { + fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize { size_of_val(self) } }