Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ disallowed-types = [
{ path = "std::time::Instant", reason = "Use `datafusion_common::instant::Instant` instead for WASM compatibility" },
]

ignore-interior-mutability = [
# arrow_buffer::Bytes contains Mutex<MemoryReservation> for memory pool tracking,
# but this doesn't affect Hash/Eq implementations (those only depend on data content).
"arrow_buffer::bytes::Bytes",
]

# Lowering the threshold to help prevent stack overflows (default is 16384)
# See: https://rust-lang.github.io/rust-clippy/master/index.html#/large_futures
future-size-threshold = 10000
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ workspace = true

[dev-dependencies]
arrow = { workspace = true }
arrow-buffer = { workspace = true, features = ["pool"] }
# arrow_schema is required for record_batch! macro :sad:
arrow-flight = { workspace = true }
arrow-schema = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions datafusion-examples/examples/udf/advanced_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! See `main.rs` for how to run it.

use arrow::datatypes::{Field, Schema};
use arrow_buffer::MemoryPool;
use datafusion::physical_expr::NullState;
use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility};
use std::{any::Any, sync::Arc};
Expand Down Expand Up @@ -195,7 +196,7 @@ impl Accumulator for GeometricMean {
})
}

fn size(&self) -> usize {
fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize {
size_of_val(self)
}
}
Expand Down Expand Up @@ -362,7 +363,7 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator {
])
}

fn size(&self) -> usize {
fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize {
self.counts.capacity() * size_of::<u32>()
+ self.prods.capacity() * size_of::<Float64Type>()
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion-examples/examples/udf/simple_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! See `main.rs` for how to run it.
//!
use arrow_buffer::MemoryPool;
/// In this example we will declare a single-type, single return type UDAF that computes the geometric mean.
/// The geometric mean is described here: https://en.wikipedia.org/wiki/Geometric_mean
use datafusion::arrow::{
Expand Down Expand Up @@ -132,7 +133,7 @@ impl Accumulator for GeometricMean {
})
}

fn size(&self) -> usize {
fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize {
size_of_val(self)
}
}
Expand Down
14 changes: 14 additions & 0 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3418,6 +3418,20 @@ impl ScalarValue {
}
}

/// Returns the inner ArrayRef if this ScalarValue contains one
/// (List, LargeList, FixedSizeList, Struct, Map variants).
/// Returns None for primitive types.
pub fn get_array_ref(&self) -> Option<ArrayRef> {
match self {
ScalarValue::List(arr) => Some(Arc::clone(arr) as ArrayRef),
ScalarValue::LargeList(arr) => Some(Arc::clone(arr) as ArrayRef),
ScalarValue::FixedSizeList(arr) => Some(Arc::clone(arr) as ArrayRef),
ScalarValue::Struct(arr) => Some(Arc::clone(arr) as ArrayRef),
ScalarValue::Map(arr) => Some(Arc::clone(arr) as ArrayRef),
_ => None,
}
}

/// Converts a value in `array` at `index` into a ScalarValue
pub fn try_from_array(array: &dyn Array, index: usize) -> Result<Self> {
// handle NULL value
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ extended_tests = []

[dependencies]
arrow = { workspace = true }
arrow-buffer = { workspace = true, features = ["pool"] }
arrow-schema = { workspace = true, features = ["canonical_extension_types"] }
async-trait = { workspace = true }
bytes = { workspace = true }
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/tests/user_defined/user_defined_aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ impl Accumulator for TimeSum {
Ok(ScalarValue::TimestampNanosecond(Some(self.sum), None))
}

fn size(&self) -> usize {
fn size(&self, _pool: Option<&dyn arrow_buffer::MemoryPool>) -> usize {
// accurate size estimates are not important for this example
42
}
Expand Down Expand Up @@ -781,7 +781,7 @@ impl Accumulator for FirstSelector {
self.update_batch(states)
}

fn size(&self) -> usize {
fn size(&self, _pool: Option<&dyn arrow_buffer::MemoryPool>) -> usize {
size_of_val(self)
}
}
Expand Down Expand Up @@ -835,7 +835,7 @@ impl Accumulator for TestGroupsAccumulator {
Ok(ScalarValue::from(self.result))
}

fn size(&self) -> usize {
fn size(&self, _pool: Option<&dyn arrow_buffer::MemoryPool>) -> usize {
size_of::<u64>()
}

Expand Down Expand Up @@ -883,7 +883,7 @@ impl GroupsAccumulator for TestGroupsAccumulator {
Ok(())
}

fn size(&self) -> usize {
fn size(&self, _pool: Option<&dyn arrow_buffer::MemoryPool>) -> usize {
size_of::<u64>()
}
}
Expand Down Expand Up @@ -999,7 +999,7 @@ impl Accumulator for MetadataBasedAccumulator {
Ok(ScalarValue::from(v))
}

fn size(&self) -> usize {
fn size(&self, _pool: Option<&dyn arrow_buffer::MemoryPool>) -> usize {
9
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ async fn udaf_as_window_func() -> Result<()> {
unimplemented!()
}

fn size(&self) -> usize {
fn size(&self, _pool: Option<&dyn arrow_buffer::MemoryPool>) -> usize {
unimplemented!()
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ name = "datafusion_expr_common"

[dependencies]
arrow = { workspace = true }
arrow-buffer = { workspace = true, features = ["pool"] }
datafusion-common = { workspace = true }
indexmap = { workspace = true }
itertools = { workspace = true }
Expand Down
33 changes: 26 additions & 7 deletions datafusion/expr-common/src/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Accumulator module contains the trait definition for aggregation function's accumulators.

use arrow::array::ArrayRef;
use arrow_buffer::MemoryPool;
use datafusion_common::{Result, ScalarValue, internal_err};
use std::fmt::Debug;

Expand Down Expand Up @@ -71,15 +72,33 @@ pub trait Accumulator: Send + Sync + Debug {
/// when possible (for example distinct strings)
fn evaluate(&mut self) -> Result<ScalarValue>;

/// Returns the allocated size required for this accumulator, in
/// bytes, including `Self`.
/// Returns the allocated memory size required by this accumulator, in bytes.
///
/// This value is used to calculate the memory used during
/// execution so DataFusion can stay within its allotted limit.
/// This value is used to calculate the memory used during execution so
/// DataFusion can stay within its allocated memory limit.
///
/// "Allocated" means that for internal containers such as `Vec`,
/// the `capacity` should be used not the `len`.
fn size(&self) -> usize;
/// # Memory Pool for Shared Buffers
///
/// When `pool` is `None`:
/// - Returns the total memory size including Arrow buffers
/// - This is the default behavior and works for most use cases
///
/// When `pool` is `Some`:
/// - Returns only the structural size (the accumulator struct, Vec capacities, etc.)
/// - Claims Arrow buffers with the provided [`MemoryPool`] for tracking
/// - It is recommended when accumulators may share Arrow array buffers, as the pool
/// automatically deduplicates shared buffers to provide accurate memory accounting
/// - Callers should add `pool.used()` to this return value for the total memory
///
/// # Example
///
/// ```ignore
/// // With memory pool for accurate tracking when buffers may be shared
/// let pool = TrackingMemoryPool::default();
/// let structural_size = accumulator.size(Some(&pool));
/// let total_memory = structural_size + pool.used();
/// ```
fn size(&self, pool: Option<&dyn MemoryPool>) -> usize;

/// Returns the intermediate state of the accumulator, consuming the
/// intermediate state.
Expand Down
32 changes: 27 additions & 5 deletions datafusion/expr-common/src/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Vectorized [`GroupsAccumulator`]

use arrow::array::{ArrayRef, BooleanArray};
use arrow_buffer::MemoryPool;
use datafusion_common::{Result, not_impl_err};

/// Describes how many rows should be emitted during grouping.
Expand Down Expand Up @@ -244,10 +245,31 @@ pub trait GroupsAccumulator: Send {
false
}

/// Amount of memory used to store the state of this accumulator,
/// in bytes.
/// Returns the allocated memory size required by this accumulator, in bytes.
///
/// This function is called once per batch, so it should be `O(n)` to
/// compute, not `O(num_groups)`
fn size(&self) -> usize;
/// This value is used to calculate the memory used during execution so
/// DataFusion can stay within its allocated memory limit.
///
/// # Memory Pool for Shared Buffers
///
/// When `pool` is `None`:
/// - Returns the total memory size including Arrow buffers
/// - This is the default behavior and works for most use cases
///
/// When `pool` is `Some`:
/// - Returns only the structural size (the accumulator struct, Vec capacities, etc.)
/// - Claims Arrow buffers with the provided [`MemoryPool`] for tracking
/// - It is recommended when accumulators may share Arrow array buffers, as the pool
/// automatically deduplicates shared buffers to provide accurate memory accounting
/// - Callers should add `pool.used()` to this return value for the total memory
///
/// # Example
///
/// ```ignore
/// // With memory pool for accurate tracking when buffers may be shared
/// let pool = TrackingMemoryPool::default();
/// let structural_size = accumulator.size(Some(&pool));
/// let total_memory = structural_size + pool.used();
/// ```
fn size(&self, pool: Option<&dyn MemoryPool>) -> usize;
}
1 change: 1 addition & 0 deletions datafusion/ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ crate-type = ["cdylib", "rlib"]
[dependencies]
abi_stable = "0.11.3"
arrow = { workspace = true, features = ["ffi"] }
arrow-buffer = { workspace = true, features = ["pool"] }
arrow-schema = { workspace = true }
async-ffi = { version = "0.5.0", features = ["abi_stable"] }
async-trait = { workspace = true }
Expand Down
Loading
Loading