Skip to content

Commit a84f329

Browse files
committed
Adding test for memory manager
1 parent 476e58b commit a84f329

File tree

1 file changed

+163
-2
lines changed

1 file changed

+163
-2
lines changed

datafusion/src/execution/memory_manager.rs

Lines changed: 163 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ impl Display for MemoryConsumerId {
6868
#[async_trait]
6969
/// A memory consumer that either takes up memory (of type `ConsumerType::Tracking`)
7070
/// or grows/shrinks memory usage based on available memory (of type `ConsumerType::Requesting`).
71-
pub trait MemoryConsumer: Send + Sync + Debug {
71+
pub trait MemoryConsumer: Send + Sync {
7272
/// Display name of the consumer
7373
fn name(&self) -> String;
7474

@@ -113,7 +113,7 @@ pub trait MemoryConsumer: Send + Sync + Debug {
113113
fn mem_used(&self) -> usize;
114114
}
115115

116-
impl Display for dyn MemoryConsumer {
116+
impl Debug for dyn MemoryConsumer {
117117
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
118118
write!(
119119
f,
@@ -125,6 +125,12 @@ impl Display for dyn MemoryConsumer {
125125
}
126126
}
127127

128+
impl Display for dyn MemoryConsumer {
129+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
130+
write!(f, "{}[{}]", self.name(), self.id(),)
131+
}
132+
}
133+
128134
/*
129135
The memory management architecture is the following:
130136
@@ -284,3 +290,158 @@ fn human_readable_size(size: usize) -> String {
284290
};
285291
format!("{:.1} {}", value, unit)
286292
}
293+
294+
#[cfg(test)]
295+
mod tests {
296+
use crate::error::Result;
297+
use crate::execution::memory_manager::{
298+
ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
299+
};
300+
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
301+
use async_trait::async_trait;
302+
use std::sync::atomic::{AtomicUsize, Ordering};
303+
use std::sync::Arc;
304+
305+
struct DummyRequester {
306+
id: MemoryConsumerId,
307+
runtime: Arc<RuntimeEnv>,
308+
spills: AtomicUsize,
309+
mem_used: AtomicUsize,
310+
}
311+
312+
impl DummyRequester {
313+
fn new(partition: usize, runtime: Arc<RuntimeEnv>) -> Self {
314+
Self {
315+
id: MemoryConsumerId::new(partition),
316+
runtime,
317+
spills: AtomicUsize::new(0),
318+
mem_used: AtomicUsize::new(0),
319+
}
320+
}
321+
322+
fn set_used(&self, used: usize) {
323+
self.mem_used.store(used, Ordering::SeqCst);
324+
}
325+
326+
fn get_spills(&self) -> usize {
327+
self.spills.load(Ordering::SeqCst)
328+
}
329+
}
330+
331+
#[async_trait]
332+
impl MemoryConsumer for DummyRequester {
333+
fn name(&self) -> String {
334+
"dummy".to_owned()
335+
}
336+
337+
fn id(&self) -> &MemoryConsumerId {
338+
&self.id
339+
}
340+
341+
fn memory_manager(&self) -> Arc<MemoryManager> {
342+
self.runtime.memory_manager.clone()
343+
}
344+
345+
fn type_(&self) -> &ConsumerType {
346+
&ConsumerType::Requesting
347+
}
348+
349+
async fn spill(&self) -> Result<()> {
350+
self.spills.fetch_add(1, Ordering::SeqCst);
351+
self.mem_used.store(0, Ordering::SeqCst);
352+
Ok(())
353+
}
354+
355+
fn mem_used(&self) -> usize {
356+
self.mem_used.load(Ordering::SeqCst)
357+
}
358+
}
359+
360+
struct DummyTracker {
361+
id: MemoryConsumerId,
362+
runtime: Arc<RuntimeEnv>,
363+
mem_used: usize,
364+
}
365+
366+
impl DummyTracker {
367+
fn new(partition: usize, runtime: Arc<RuntimeEnv>, mem_used: usize) -> Self {
368+
Self {
369+
id: MemoryConsumerId::new(partition),
370+
runtime,
371+
mem_used,
372+
}
373+
}
374+
}
375+
376+
#[async_trait]
377+
impl MemoryConsumer for DummyTracker {
378+
fn name(&self) -> String {
379+
"dummy".to_owned()
380+
}
381+
382+
fn id(&self) -> &MemoryConsumerId {
383+
&self.id
384+
}
385+
386+
fn memory_manager(&self) -> Arc<MemoryManager> {
387+
self.runtime.memory_manager.clone()
388+
}
389+
390+
fn type_(&self) -> &ConsumerType {
391+
&ConsumerType::Tracking
392+
}
393+
394+
async fn spill(&self) -> Result<()> {
395+
Ok(())
396+
}
397+
398+
fn mem_used(&self) -> usize {
399+
self.mem_used
400+
}
401+
}
402+
403+
#[tokio::test]
404+
async fn basic_functionalities() -> Result<()> {
405+
let config = RuntimeConfig::new()
406+
.with_memory_fraction(1.0)
407+
.with_max_execution_memory(100);
408+
let runtime = Arc::new(RuntimeEnv::new(config)?);
409+
410+
let tracker1 = Arc::new(DummyTracker::new(0, runtime.clone(), 5));
411+
runtime.register_consumer(&(tracker1.clone() as Arc<dyn MemoryConsumer>));
412+
assert_eq!(runtime.memory_manager.get_tracker_total(), 5);
413+
414+
let tracker2 = Arc::new(DummyTracker::new(0, runtime.clone(), 10));
415+
runtime.register_consumer(&(tracker2.clone() as Arc<dyn MemoryConsumer>));
416+
assert_eq!(runtime.memory_manager.get_tracker_total(), 15);
417+
418+
let tracker3 = Arc::new(DummyTracker::new(0, runtime.clone(), 15));
419+
runtime.register_consumer(&(tracker3.clone() as Arc<dyn MemoryConsumer>));
420+
assert_eq!(runtime.memory_manager.get_tracker_total(), 30);
421+
422+
runtime.drop_consumer(tracker2.id());
423+
assert_eq!(runtime.memory_manager.get_tracker_total(), 20);
424+
425+
let requester1 = Arc::new(DummyRequester::new(0, runtime.clone()));
426+
runtime.register_consumer(&(requester1.clone() as Arc<dyn MemoryConsumer>));
427+
428+
// first requester entered, should be able to use any of the remaining 80
429+
requester1.set_used(40);
430+
requester1.try_grow(10).await?;
431+
assert_eq!(requester1.get_spills(), 0);
432+
433+
let requester2 = Arc::new(DummyRequester::new(0, runtime.clone()));
434+
runtime.register_consumer(&(requester2.clone() as Arc<dyn MemoryConsumer>));
435+
436+
requester2.set_used(20);
437+
requester2.try_grow(30).await?;
438+
assert_eq!(requester2.get_spills(), 1);
439+
assert_eq!(requester2.mem_used(), 0);
440+
441+
requester1.try_grow(10).await?;
442+
assert_eq!(requester1.get_spills(), 1);
443+
assert_eq!(requester1.mem_used(), 0);
444+
445+
Ok(())
446+
}
447+
}

0 commit comments

Comments
 (0)