Skip to content

Optimized spill file format #14078

@alamb

Description

@alamb

Is your feature request related to a problem or challenge?

DataFusion spills data to local disk for processing datasets that do not fit in available memory, as illustrated in this comment:

/// Sorts an arbitrary sized, unsorted, stream of [`RecordBatch`]es to
/// a total order. Depending on the input size and memory manager
/// configuration, writes intermediate results to disk ("spills")
/// using Arrow IPC format.
///
/// # Algorithm
///
/// 1. get a non-empty new batch from input
///
/// 2. check with the memory manager there is sufficient space to
/// buffer the batch in memory 2.1 if memory sufficient, buffer
/// batch in memory, go to 1.
///
/// 2.2 if no more memory is available, sort all buffered batches and
/// spill to file. buffer the next batch in memory, go to 1.
///
/// 3. when input is exhausted, merge all in memory batches and spills
/// to get a total order.
///
/// # When data fits in available memory
///
/// If there is sufficient memory, data is sorted in memory to produce the output
///
/// ```text
/// ┌─────┐
/// │ 2 │
/// │ 3 │
/// │ 1 │─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
/// │ 4 │
/// │ 2 │ │
/// └─────┘ ▼
/// ┌─────┐
/// │ 1 │ In memory
/// │ 4 │─ ─ ─ ─ ─ ─▶ sort/merge ─ ─ ─ ─ ─▶ total sorted output
/// │ 1 │
/// └─────┘ ▲
/// ... │
///
/// ┌─────┐ │
/// │ 4 │
/// │ 3 │─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
/// └─────┘
///
/// in_mem_batches
///
/// ```
///
/// # When data does not fit in available memory
///
/// When memory is exhausted, data is first sorted and written to one
/// or more spill files on disk:
///
/// ```text
/// ┌─────┐ .─────────────────.
/// │ 2 │ ( )
/// │ 3 │ │`─────────────────'│
/// │ 1 │─ ─ ─ ─ ─ ─ ─ │ ┌────┐ │
/// │ 4 │ │ │ │ 1 │░ │
/// │ 2 │ │ │... │░ │
/// └─────┘ ▼ │ │ 4 │░ ┌ ─ ─ │
/// ┌─────┐ │ └────┘░ 1 │░ │
/// │ 1 │ In memory │ ░░░░░░ │ ░░ │
/// │ 4 │─ ─ ▶ sort/merge ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─▶ ... │░ │
/// │ 1 │ and write to file │ │ ░░ │
/// └─────┘ │ 4 │░ │
/// ... ▲ │ └░─░─░░ │
/// │ │ ░░░░░░ │
/// ┌─────┐ │.─────────────────.│
/// │ 4 │ │ ( )
/// │ 3 │─ ─ ─ ─ ─ ─ ─ `─────────────────'
/// └─────┘
///
/// in_mem_batches spills
/// (file on disk in Arrow
/// IPC format)
/// ```
///
/// Once the input is completely read, the spill files are read and
/// merged with any in memory batches to produce a single total sorted
/// output:
///
/// ```text
/// .─────────────────.
/// ( )
/// │`─────────────────'│
/// │ ┌────┐ │
/// │ │ 1 │░ │
/// │ │... │─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─
/// │ │ 4 │░ ┌────┐ │ │
/// │ └────┘░ │ 1 │░ │ ▼
/// │ ░░░░░░ │ │░ │
/// │ │... │─ ─│─ ─ ─ ▶ merge ─ ─ ─▶ total sorted output
/// │ │ │░ │
/// │ │ 4 │░ │ ▲
/// │ └────┘░ │ │
/// │ ░░░░░░ │
/// │.─────────────────.│ │
/// ( )
/// `─────────────────' │
/// spills
/// │
///
/// │
///
/// ┌─────┐ │
/// │ 1 │
/// │ 4 │─ ─ ─ ─ │
/// └─────┘ │
/// ... In memory
/// └ ─ ─ ─▶ sort/merge
/// ┌─────┐
/// │ 4 │ ▲
/// │ 3 │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
/// └─────┘
///
/// in_mem_batches
/// ```
struct ExternalSorter {

Here is the code that handles spilling in sort and hash aggregates.

The current version of DataFusion spills data to disk using the Arrow IPC format, which is correct and was easy to get working, but comes with non trivial overhead, as @andygrove found in Comet:

Some potential sources of overhead are due to the validation applied to arrow IPC files (which may in general have come from untrusted sources) which is unecessary if the data was valid when written by DataFusion

Describe the solution you'd like

As part of improving DataFuison's performance of larger than memory datasets, I would like to consider adding a new optimized serialization format for use in spill files. This is similar to how we have added optimized (non Arrow) in memory storage formats for intermediate results in hash aggregation and others

At a high level this would look like:

  • Add a benchmark for spilling files (maybe write/read data to/from a spill file of various sizes)
  • Add an optimized Reader/Writer
  • Update the SortExec and GroupByHashExec to use this new Reader/Writer

Describe alternatives you've considered

Add a customized Reader/Writer

@andygrove has a PR to add a customized BatchReader to comet that seems to offer significant performance improvements for shuffle reading/writing:

We could potentially upstream this code into DataFusion

Optimize the Arrow IPC reader

Another option would be to continue using the Arrow IPC format, but disable validation:

@totoroyyb actually has a recently PR to arrow-rs proposing this approach:

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions