Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 56 additions & 10 deletions datafusion/execution/src/memory_pool/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,35 @@ pub trait VecAllocExt {
/// `accounting` by any newly allocated bytes.
///
/// Note that allocation counts capacity, not size
///
/// # Example:
/// ```
/// # use datafusion_execution::memory_pool::proxy::VecAllocExt;
/// // use allocated to incrementally track how much memory is allocated in the vec
/// let mut allocated = 0;
/// let mut vec = Vec::new();
/// // Push data into the vec and the accounting will be updated to reflect
/// // memory allocation
/// vec.push_accounted(1, &mut allocated);
/// assert_eq!(allocated, 16); // space for 4 u32s
/// vec.push_accounted(1, &mut allocated);
/// assert_eq!(allocated, 16); // no new allocation needed
///
/// // push more data into the vec
/// for _ in 0..10 { vec.push_accounted(1, &mut allocated); }
/// assert_eq!(allocated, 64); // underlying vec has space for 10 u32s
/// assert_eq!(vec.allocated_size(), 64);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The allocated size was different than what was obtained by push_accounted prior to this change

/// ```
/// # Example with other allocations:
/// ```
/// # use datafusion_execution::memory_pool::proxy::VecAllocExt;
/// // You can use the same allocated size to track memory allocated by
/// // another source. For example
/// let mut allocated = 27;
/// let mut vec = Vec::new();
/// vec.push_accounted(1, &mut allocated); // allocates 16 bytes for vec
/// assert_eq!(allocated, 43); // 16 bytes for vec, 27 bytes for other
/// ```
fn push_accounted(&mut self, x: Self::T, accounting: &mut usize);

/// Return the amount of memory allocated by this Vec to store elements
Expand All @@ -36,24 +65,41 @@ pub trait VecAllocExt {
/// Note this calculation is not recursive, and does not include any heap
/// allocations contained within the Vec's elements. Does not include the
/// size of `self`
///
/// # Example:
/// ```
/// # use datafusion_execution::memory_pool::proxy::VecAllocExt;
/// let mut vec = Vec::new();
/// // Push data into the vec and the accounting will be updated to reflect
/// // memory allocation
/// vec.push(1);
/// assert_eq!(vec.allocated_size(), 16); // space for 4 u32s
/// vec.push(1);
/// assert_eq!(vec.allocated_size(), 16); // no new allocation needed
///
/// // push more data into the vec
/// for _ in 0..10 { vec.push(1); }
/// assert_eq!(vec.allocated_size(), 64); // space for 64 now
/// ```
fn allocated_size(&self) -> usize;
}

impl<T> VecAllocExt for Vec<T> {
type T = T;

fn push_accounted(&mut self, x: Self::T, accounting: &mut usize) {
if self.capacity() == self.len() {
// allocate more

// growth factor: 2, but at least 2 elements
let bump_elements = (self.capacity() * 2).max(2);
let bump_size = std::mem::size_of::<u32>() * bump_elements;
Copy link
Contributor Author

@alamb alamb Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be sizeof<T> rather than sizeof<u32> but perhaps I am missing something

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks correct. I suspect this was the cause of the discrepancy in the original code

self.reserve(bump_elements);
let prev_capacty = self.capacity();
self.push(x);
let new_capacity = self.capacity();
if new_capacity > prev_capacty {
// capacity changed, so we allocated more
let bump_size = (new_capacity - prev_capacty) * std::mem::size_of::<T>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to do a checked_mul here similar to the checked_add below?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either that or remove the checked_add because push would panic first now that it's called first instead of at the end

Copy link
Contributor

@erratic-pattern erratic-pattern Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking about it more: if accounting is tracking multiple vecs, then this makes sense. multiplication should never overflow because push would panic first in that case, but the checked_add could potentially overflow since accounting could be some value greater than capacity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is a good point. I will add a comment to clarify

// Note multiplication should never overflow because `push` would
// have panic'd first, but the checked_add could potentially
// overflow since accounting could be tracking additional values, and
// could be greater than what is stored in the Vec
*accounting = (*accounting).checked_add(bump_size).expect("overflow");
}

self.push(x);
}
fn allocated_size(&self) -> usize {
std::mem::size_of::<T>() * self.capacity()
Expand All @@ -69,7 +115,7 @@ pub trait RawTableAllocExt {
/// `accounting` by any newly allocated bytes.
///
/// Returns the bucket where the element was inserted.
/// Note that allocation counts capacity, not size.
/// Note that allocation counts capacity, not size.
///
/// # Example:
/// ```
Expand Down