-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Is your feature request related to a problem or challenge?
DataFusion spills data to local disk for processing datasets that do not fit in available memory, as illustrated in this comment:
datafusion/datafusion/physical-plan/src/sorts/sort.rs
Lines 88 to 205 in 918ac1b
/// Sorts an arbitrary sized, unsorted, stream of [`RecordBatch`]es to | |
/// a total order. Depending on the input size and memory manager | |
/// configuration, writes intermediate results to disk ("spills") | |
/// using Arrow IPC format. | |
/// | |
/// # Algorithm | |
/// | |
/// 1. get a non-empty new batch from input | |
/// | |
/// 2. check with the memory manager there is sufficient space to | |
/// buffer the batch in memory 2.1 if memory sufficient, buffer | |
/// batch in memory, go to 1. | |
/// | |
/// 2.2 if no more memory is available, sort all buffered batches and | |
/// spill to file. buffer the next batch in memory, go to 1. | |
/// | |
/// 3. when input is exhausted, merge all in memory batches and spills | |
/// to get a total order. | |
/// | |
/// # When data fits in available memory | |
/// | |
/// If there is sufficient memory, data is sorted in memory to produce the output | |
/// | |
/// ```text | |
/// ┌─────┐ | |
/// │ 2 │ | |
/// │ 3 │ | |
/// │ 1 │─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ | |
/// │ 4 │ | |
/// │ 2 │ │ | |
/// └─────┘ ▼ | |
/// ┌─────┐ | |
/// │ 1 │ In memory | |
/// │ 4 │─ ─ ─ ─ ─ ─▶ sort/merge ─ ─ ─ ─ ─▶ total sorted output | |
/// │ 1 │ | |
/// └─────┘ ▲ | |
/// ... │ | |
/// | |
/// ┌─────┐ │ | |
/// │ 4 │ | |
/// │ 3 │─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ | |
/// └─────┘ | |
/// | |
/// in_mem_batches | |
/// | |
/// ``` | |
/// | |
/// # When data does not fit in available memory | |
/// | |
/// When memory is exhausted, data is first sorted and written to one | |
/// or more spill files on disk: | |
/// | |
/// ```text | |
/// ┌─────┐ .─────────────────. | |
/// │ 2 │ ( ) | |
/// │ 3 │ │`─────────────────'│ | |
/// │ 1 │─ ─ ─ ─ ─ ─ ─ │ ┌────┐ │ | |
/// │ 4 │ │ │ │ 1 │░ │ | |
/// │ 2 │ │ │... │░ │ | |
/// └─────┘ ▼ │ │ 4 │░ ┌ ─ ─ │ | |
/// ┌─────┐ │ └────┘░ 1 │░ │ | |
/// │ 1 │ In memory │ ░░░░░░ │ ░░ │ | |
/// │ 4 │─ ─ ▶ sort/merge ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─▶ ... │░ │ | |
/// │ 1 │ and write to file │ │ ░░ │ | |
/// └─────┘ │ 4 │░ │ | |
/// ... ▲ │ └░─░─░░ │ | |
/// │ │ ░░░░░░ │ | |
/// ┌─────┐ │.─────────────────.│ | |
/// │ 4 │ │ ( ) | |
/// │ 3 │─ ─ ─ ─ ─ ─ ─ `─────────────────' | |
/// └─────┘ | |
/// | |
/// in_mem_batches spills | |
/// (file on disk in Arrow | |
/// IPC format) | |
/// ``` | |
/// | |
/// Once the input is completely read, the spill files are read and | |
/// merged with any in memory batches to produce a single total sorted | |
/// output: | |
/// | |
/// ```text | |
/// .─────────────────. | |
/// ( ) | |
/// │`─────────────────'│ | |
/// │ ┌────┐ │ | |
/// │ │ 1 │░ │ | |
/// │ │... │─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ | |
/// │ │ 4 │░ ┌────┐ │ │ | |
/// │ └────┘░ │ 1 │░ │ ▼ | |
/// │ ░░░░░░ │ │░ │ | |
/// │ │... │─ ─│─ ─ ─ ▶ merge ─ ─ ─▶ total sorted output | |
/// │ │ │░ │ | |
/// │ │ 4 │░ │ ▲ | |
/// │ └────┘░ │ │ | |
/// │ ░░░░░░ │ | |
/// │.─────────────────.│ │ | |
/// ( ) | |
/// `─────────────────' │ | |
/// spills | |
/// │ | |
/// | |
/// │ | |
/// | |
/// ┌─────┐ │ | |
/// │ 1 │ | |
/// │ 4 │─ ─ ─ ─ │ | |
/// └─────┘ │ | |
/// ... In memory | |
/// └ ─ ─ ─▶ sort/merge | |
/// ┌─────┐ | |
/// │ 4 │ ▲ | |
/// │ 3 │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ | |
/// └─────┘ | |
/// | |
/// in_mem_batches | |
/// ``` | |
struct ExternalSorter { |
Here is the code that handles spilling in sort and hash aggregates.
The current version of DataFusion spills data to disk using the Arrow IPC format, which is correct and was easy to get working, but comes with non trivial overhead, as @andygrove found in Comet:
Some potential sources of overhead are due to the validation applied to arrow IPC files (which may in general have come from untrusted sources) which is unecessary if the data was valid when written by DataFusion
Describe the solution you'd like
As part of improving DataFuison's performance of larger than memory datasets, I would like to consider adding a new optimized serialization format for use in spill files. This is similar to how we have added optimized (non Arrow) in memory storage formats for intermediate results in hash aggregation and others
At a high level this would look like:
- Add a benchmark for spilling files (maybe write/read data to/from a spill file of various sizes)
- Add an optimized Reader/Writer
- Update the SortExec and GroupByHashExec to use this new Reader/Writer
Describe alternatives you've considered
Add a customized Reader/Writer
@andygrove has a PR to add a customized BatchReader to comet that seems to offer significant performance improvements for shuffle reading/writing:
We could potentially upstream this code into DataFusion
Optimize the Arrow IPC reader
Another option would be to continue using the Arrow IPC format, but disable validation:
- Improve Arrow-IPC performance by avoiding Unsafe Unchecked IPC Read RecordBatch arrow-rs#3287
- Optionally disable data validation for arrow-ipc arrow-rs#6933
@totoroyyb actually has a recently PR to arrow-rs proposing this approach:
Additional context
No response