Skip to content

OOM in GroupedHashAggregateStream::group_aggregate_batch() #13831

@avantgardnerio

Description

@avantgardnerio

Describe the bug

When attempting to accumulate large text fields with a group by, it was observed that group_aggregate_batch() can OOM despite ostensibly using the MemoryPool.

Query:

select truncated_time, count(*) AS cnt
from (
    select
        truncated_time, k8s_deployment_name, message
    from (
        SELECT
            priorityclass,
            timestamp,
            date_trunc('day', timestamp) AS truncated_time,
            k8s_deployment_name,
            message
        FROM agg_oom
        where priorityclass != 'low'
    )
    group by truncated_time, k8s_deployment_name, message
) group by truncated_time

On 8x ~50MB parquet files where the message column can be up to 8192 byte strings. When profiled, by far it was the largest use of memory:

image

When logging, we can see it fails while interning

converting 3 rows
interning 8192 rows with 1486954 bytes
interned 8192 rows, now I'm 13054176 bytes
resizing to 14103171
resizing to 14103171
reserving 28206342 extra bytes
converting 3 rows
interning 8192 rows with 1350859 bytes
memory allocation of 25690112 bytes failed
Aborted (core dumped)

To Reproduce

  1. set up a test with
    let memory_limit = 125_000_000;
    let MEMORY_FRACTION = 1.0;
    let rt_config = RuntimeConfig::new()
        .with_memory_limit(memory_limit, MEMORY_FRACTION);

2.set ulimit -v 1152000

  1. query some parquet files with long strings

Expected behavior

group_aggregate_batch() doesn't make the assumption:

            // Here we can ignore `insufficient_capacity_err` because we will spill later,
            // but at least one batch should fit in the memory

But instead realizes that adding 1 row to a million doesn't allocate 1,000,001, but rather 2,000,000 when the Vec exponentially resizes.

Additional context

Proposed solution:

Add

            self.reservation.try_resize(self.reservation.size() * 2)?;

Above

            self.group_values
                .intern(group_values, &mut self.current_group_indices)?;

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions