Skip to content

Un-cancellable Query when hitting many large files. #14036

@jeffreyssmith2nd

Description

@jeffreyssmith2nd

Describe the bug

TLDR; Reading many large Parquet files can prevent a query from being cancelled.

We have a customer that is running a query similar to the following (edited for privacy):

SELECT DISTINCT "A","B","C","D","E" FROM table where "time" > now() - INTERVAL '5 days';

This produces a fairly straightforward plan explain.txt.

Simplified Plan:

AggregateExec mode=FinalPartitioned
  CoalesceBatchesExec target_batch_size=8192
    RepartitionExec input_partitions=4
      AggregateExec mode=Partial
        ParquetExec file_groups={4 groups}

This will read ~85 parquet files at ~100MB each. What we've seen is that even when a query is cancelled, the resources with that query (CPU/RAM) are still being utilized for almost the same amount of time as a typical execution of the query.

To Reproduce

I have struggled to come up with a good reproducer for this that doesn't rely on the customer data, I would welcome some help in this matter if anyone has a dataset that matches this pattern that is shareable.

Expected behavior

Cancelling a query should (within some reasonable amount of time) truly cancel the query, freeing up system resources for other query executions.

Additional context

This appears to be a problem with the interaction between the GroupedHashAggregateStream and FileStream approaches to yielding.

The GroupedHashAggregateStream will infinitely loop until its child stream (in this case a FileStream) is exhausted, errors, or returns Pending.

The FileStream loops while attempting to read RecordBatches from the underlying file, while also doing some book-keeping to stage the next File for reading. This Stream will return Ready when a RecordBatch is processed, or an Error encountered. However, it does not return Pending at any point. When a File is finished reading, the next File up is swapped in and reading continues from the new File.

The combination of these two behaviours means that if there are many large files being read by the FileStream, the GroupedHashAggregateStream doesn't effectively yield back to Tokio.

My PR to fix this is to have the FileStream return Pending when swapping over to a new file. This seems like a natural yield point, and resolves the issue with cancellation.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions