Skip to content

Commit b814058

Browse files
committed
GroupedHashAggregateStream breaks spill batch
... into smaller chunks to decrease memory required for merging.
1 parent d8e413c commit b814058

File tree

1 file changed

+12
-1
lines changed

1 file changed

+12
-1
lines changed

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -673,7 +673,18 @@ impl GroupedHashAggregateStream {
673673
let spillfile = self.runtime.disk_manager.create_tmp_file("HashAggSpill")?;
674674
let mut writer = IPCWriter::new(spillfile.path(), &emit.schema())?;
675675
// TODO: slice large `sorted` and write to multiple files in parallel
676-
writer.write(&sorted)?;
676+
let mut offset = 0;
677+
let total_rows = sorted.num_rows();
678+
679+
while offset < total_rows {
680+
// TODO: we could consider smaller batch size as there may be hundreds of batches
681+
// loaded at the same time.
682+
let length = std::cmp::min(total_rows - offset, self.batch_size);
683+
let batch = sorted.slice(offset, length);
684+
offset += batch.num_rows();
685+
writer.write(&batch)?;
686+
}
687+
677688
writer.finish()?;
678689
self.spill_state.spills.push(spillfile);
679690
Ok(())

0 commit comments

Comments
 (0)