Skip to content

Force compact in topK when we hit memory limit #19386

@bharath-techie

Description

@bharath-techie

Is your feature request related to a problem or challenge?

More context in #19216 (comment)
How to reproduce ?

Take clickbench workload

datafusion-cli -m 8G
DataFusion CLI v51.0.0
> SET datafusion.execution.target_partitions=1;
0 row(s) fetched. 
Elapsed 0.001 seconds.

> CREATE EXTERNAL TABLE hits 
STORED AS PARQUET 
LOCATION '/home/ec2-user/clickdata/partitioned/hits/*.parquet';
0 row(s) fetched. 
Elapsed 0.054 seconds.

> SELECT "URL", COUNT(*) AS c FROM hits GROUP BY "URL" ORDER BY c DESC LIMIT 10;
Resources exhausted: Additional allocation failed for TopK[0] with top memory consumers (across reservations) as:
  TopK[0]#4(can spill: false) consumed 7.8 GB, peak 7.8 GB,
  GroupedHashAggregateStream[0] (count(1))#3(can spill: true) consumed 80.4 KB, peak 4.6 GB,
  DataFusion-Cli#2(can spill: false) consumed 0.0 B, peak 0.0 B.
Error: Failed to allocate additional 3.9 GB for TopK[0] with 7.8 GB already allocated for this reservation - 187.0 MB remain available for the total pool
> 

With 8 GB and 1 partition, the above query SELECT "URL", COUNT(*) AS c FROM hits GROUP BY "URL" ORDER BY c DESC LIMIT 10; fails as the hash aggregate step for URL field is quite heavy.

For each batch that gets inserted and referred to in TopK , the entire batch size gets added during the size estimation. [ as mentioned in https://github.com/https://github.com//issues/9562 ]

Path : datafusion/datafusion/datafusion/physical-plan/src/topk/mod.rs

/// Insert a record batch entry into this store, tracking its
    /// memory use, if it has any uses
    pub fn insert(&mut self, entry: RecordBatchEntry) {
        // uses of 0 means that none of the rows in the batch were stored in the topk
        if entry.uses > 0 {
            let size = get_record_batch_memory_size(&entry.batch);
            self.batches_size += size;
            println!("size during insert : {}", size);
            self.batches.insert(entry.id, entry);
        }
    }

/// returns the size of memory used by this store, including all
    /// referenced `RecordBatch`es, in bytes
    pub fn size(&self) -> usize {
        // size_of::<Self>()
        //     + self.batches.capacity() * (size_of::<u32>() + size_of::<RecordBatchEntry>())
        //     + self.batches_size

        let sizeOfSelf = size_of::<Self>();
        let capacity = self.batches.capacity();
        let u32RecordBatch = size_of::<u32>() + size_of::<RecordBatchEntry>();
        let batchesSize = self.batches_size;

        let size = sizeOfSelf + capacity * u32RecordBatch + batchesSize;
        println!("self size : {} , capacity : {} , heap size : {}, batch size : {}",
                    sizeOfSelf, capacity, u32RecordBatch, batchesSize);
        println!("size during get : {}", size);
        size
    }

Record batch size during insert : 4196909056
self size : 48 , capacity : 3 , heap size : 60, batch size : 4196909056
size during get : 4196909284

Record batch size during insert size during insert : 4196909056
self size : 48 , capacity : 3 , heap size : 60, batch size : 8393818112
size during get : 8393818340

Record batch size during insert size during insert : 4196909056
self size : 48 , capacity : 3 , heap size : 60, batch size : 12590727168
size during get : 12590727396

Describe the solution you'd like

Implement force compaction in topK once we hit memory limit as mentioned in #9417 (comment)

@alamb also mentioned #9417 (comment) , #16841 (comment) - potentially there are other places where we could similar optimization ?

Describe alternatives you've considered

Tried spill but force compact is better option here.

Additional context

The main problem is multiple counting as that's the reason why this particular query fails with memory limit.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions