-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Is your feature request related to a problem or challenge?
More context in #19216 (comment)
How to reproduce ?
Take clickbench workload
datafusion-cli -m 8G
DataFusion CLI v51.0.0
> SET datafusion.execution.target_partitions=1;
0 row(s) fetched.
Elapsed 0.001 seconds.
> CREATE EXTERNAL TABLE hits
STORED AS PARQUET
LOCATION '/home/ec2-user/clickdata/partitioned/hits/*.parquet';
0 row(s) fetched.
Elapsed 0.054 seconds.
> SELECT "URL", COUNT(*) AS c FROM hits GROUP BY "URL" ORDER BY c DESC LIMIT 10;
Resources exhausted: Additional allocation failed for TopK[0] with top memory consumers (across reservations) as:
TopK[0]#4(can spill: false) consumed 7.8 GB, peak 7.8 GB,
GroupedHashAggregateStream[0] (count(1))#3(can spill: true) consumed 80.4 KB, peak 4.6 GB,
DataFusion-Cli#2(can spill: false) consumed 0.0 B, peak 0.0 B.
Error: Failed to allocate additional 3.9 GB for TopK[0] with 7.8 GB already allocated for this reservation - 187.0 MB remain available for the total pool
>
With 8 GB and 1 partition, the above query SELECT "URL", COUNT(*) AS c FROM hits GROUP BY "URL" ORDER BY c DESC LIMIT 10; fails as the hash aggregate step for URL field is quite heavy.
For each batch that gets inserted and referred to in TopK , the entire batch size gets added during the size estimation. [ as mentioned in https://github.com/https://github.com//issues/9562 ]
Path : datafusion/datafusion/datafusion/physical-plan/src/topk/mod.rs
/// Insert a record batch entry into this store, tracking its
/// memory use, if it has any uses
pub fn insert(&mut self, entry: RecordBatchEntry) {
// uses of 0 means that none of the rows in the batch were stored in the topk
if entry.uses > 0 {
let size = get_record_batch_memory_size(&entry.batch);
self.batches_size += size;
println!("size during insert : {}", size);
self.batches.insert(entry.id, entry);
}
}
/// returns the size of memory used by this store, including all
/// referenced `RecordBatch`es, in bytes
pub fn size(&self) -> usize {
// size_of::<Self>()
// + self.batches.capacity() * (size_of::<u32>() + size_of::<RecordBatchEntry>())
// + self.batches_size
let sizeOfSelf = size_of::<Self>();
let capacity = self.batches.capacity();
let u32RecordBatch = size_of::<u32>() + size_of::<RecordBatchEntry>();
let batchesSize = self.batches_size;
let size = sizeOfSelf + capacity * u32RecordBatch + batchesSize;
println!("self size : {} , capacity : {} , heap size : {}, batch size : {}",
sizeOfSelf, capacity, u32RecordBatch, batchesSize);
println!("size during get : {}", size);
size
}
Record batch size during insert : 4196909056
self size : 48 , capacity : 3 , heap size : 60, batch size : 4196909056
size during get : 4196909284
Record batch size during insert size during insert : 4196909056
self size : 48 , capacity : 3 , heap size : 60, batch size : 8393818112
size during get : 8393818340
Record batch size during insert size during insert : 4196909056
self size : 48 , capacity : 3 , heap size : 60, batch size : 12590727168
size during get : 12590727396
Describe the solution you'd like
Implement force compaction in topK once we hit memory limit as mentioned in #9417 (comment)
@alamb also mentioned #9417 (comment) , #16841 (comment) - potentially there are other places where we could similar optimization ?
Describe alternatives you've considered
Tried spill but force compact is better option here.
Additional context
The main problem is multiple counting as that's the reason why this particular query fails with memory limit.
- feat(memory-tracking): implement arrow_buffer::MemoryPool for MemoryPool #18928 - will this help ?
- [DISCUSSION] Memory accounting model discussion #16841 - I see multiple approaches being discussed for fixing double counting issue.