Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ pub fn compute_record_batch_statistics(
) -> Statistics {
let nb_rows = batches.iter().flatten().map(RecordBatch::num_rows).sum();

let total_byte_size = batches.iter().flatten().map(batch_byte_size).sum();
let total_byte_size = batches
.iter()
.flatten()
.map(|b| b.get_array_memory_size())
.sum();

let projection = match projection {
Some(p) => p,
Expand Down Expand Up @@ -340,7 +344,7 @@ impl IPCWriter {
self.writer.write(batch)?;
self.num_batches += 1;
self.num_rows += batch.num_rows() as u64;
let num_bytes: usize = batch_byte_size(batch);
let num_bytes: usize = batch.get_array_memory_size();
self.num_bytes += num_bytes as u64;
Ok(())
}
Expand All @@ -357,12 +361,9 @@ impl IPCWriter {
}

/// Returns the total number of bytes of memory occupied physically by this batch.
#[deprecated(since = "28.0.0", note = "RecordBatch::get_array_memory_size")]
pub fn batch_byte_size(batch: &RecordBatch) -> usize {
batch
.columns()
.iter()
.map(|array| array.get_array_memory_size())
.sum()
batch.get_array_memory_size()
}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! It will do in-memory sorting if it has enough memory budget
//! but spills to disk if needed.

use crate::physical_plan::common::{batch_byte_size, spawn_buffered, IPCWriter};
use crate::physical_plan::common::{spawn_buffered, IPCWriter};
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
Expand Down Expand Up @@ -258,7 +258,7 @@ impl ExternalSorter {
return Ok(());
}

let size = batch_byte_size(&input);
let size = input.get_array_memory_size();
if self.reservation.try_grow(size).is_err() {
let before = self.reservation.size();
self.in_mem_sort().await?;
Expand Down