-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Break GroupedHashAggregateStream spill batch into smaller chunks #8004
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -673,7 +673,16 @@ impl GroupedHashAggregateStream { | |
| let spillfile = self.runtime.disk_manager.create_tmp_file("HashAggSpill")?; | ||
| let mut writer = IPCWriter::new(spillfile.path(), &emit.schema())?; | ||
| // TODO: slice large `sorted` and write to multiple files in parallel | ||
| writer.write(&sorted)?; | ||
| let mut offset = 0; | ||
| let total_rows = sorted.num_rows(); | ||
|
|
||
| while offset < total_rows { | ||
| let length = std::cmp::min(total_rows - offset, self.batch_size); | ||
| let batch = sorted.slice(offset, length); | ||
| offset += batch.num_rows(); | ||
| writer.write(&batch)?; | ||
| } | ||
|
|
||
|
Comment on lines
675
to
+685
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to write in parallel? So that this becomes less blocking. Additional improvement would be that chunking before sorting. I remember the discussion was There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see big benefit of writing in parallel, but we can give it a try at the later date. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. Additionally, I think we have to create a new writer otherwise, we keep appending to the same temp file? So we end up having the same file size? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for the confusion, not smaller files, smaller batches. We still keep one file per spill method invocation Previous implementation would just have one batch per file, this change would introduce more than one batch per file. So file size should be similar size to previous implantation, probably even a bit bigger, but that same file will have more than one batch in it. Streaming merge will open same number of files like before, but it will load smaller batches into memory (if we have one batch per file it would mean whole file would be loaded). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for explaining! |
||
| writer.finish()?; | ||
| self.spill_state.spills.push(spillfile); | ||
| Ok(()) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this related?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is, with original value it will panic with
ResourcesExhaustedin the merge phase, so I have increase it. Did not do in depth analysis of the problem