Skip to content

Commit 06d2573

Browse files
committed
refine test in repartition.rs & coalesce_batches.rs
1 parent 641338f commit 06d2573

File tree

5 files changed

+28
-44
lines changed

5 files changed

+28
-44
lines changed

datafusion/src/physical_plan/coalesce_batches.rs

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -295,9 +295,8 @@ pub fn concat_batches(
295295
#[cfg(test)]
296296
mod tests {
297297
use super::*;
298-
use crate::from_slice::FromSlice;
299298
use crate::physical_plan::{memory::MemoryExec, repartition::RepartitionExec};
300-
use arrow::array::UInt32Array;
299+
use crate::test::create_vec_batches;
301300
use arrow::datatypes::{DataType, Field, Schema};
302301

303302
#[tokio::test(flavor = "multi_thread")]
@@ -325,23 +324,6 @@ mod tests {
325324
Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]))
326325
}
327326

328-
fn create_vec_batches(schema: &Arc<Schema>, num_batches: usize) -> Vec<RecordBatch> {
329-
let batch = create_batch(schema);
330-
let mut vec = Vec::with_capacity(num_batches);
331-
for _ in 0..num_batches {
332-
vec.push(batch.clone());
333-
}
334-
vec
335-
}
336-
337-
fn create_batch(schema: &Arc<Schema>) -> RecordBatch {
338-
RecordBatch::try_new(
339-
schema.clone(),
340-
vec![Arc::new(UInt32Array::from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]))],
341-
)
342-
.unwrap()
343-
}
344-
345327
async fn coalesce_batches(
346328
schema: &SchemaRef,
347329
input_partitions: Vec<Vec<RecordBatch>>,

datafusion/src/physical_plan/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send + Sync
5959
/// EmptyRecordBatchStream can be used to create a RecordBatchStream
6060
/// that will produce no results
6161
pub struct EmptyRecordBatchStream {
62-
/// Schema
62+
/// Schema wrapped by Arc
6363
schema: SchemaRef,
6464
}
6565

@@ -384,9 +384,7 @@ impl Partitioning {
384384
pub fn partition_count(&self) -> usize {
385385
use Partitioning::*;
386386
match self {
387-
RoundRobinBatch(n) => *n,
388-
Hash(_, n) => *n,
389-
UnknownPartitioning(n) => *n,
387+
RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n,
390388
}
391389
}
392390
}

datafusion/src/physical_plan/planner.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,9 +226,9 @@ pub trait PhysicalPlanner {
226226
///
227227
/// `expr`: the expression to convert
228228
///
229-
/// `input_dfschema`: the logical plan schema for evaluating `e`
229+
/// `input_dfschema`: the logical plan schema for evaluating `expr`
230230
///
231-
/// `input_schema`: the physical schema for evaluating `e`
231+
/// `input_schema`: the physical schema for evaluating `expr`
232232
fn create_physical_expr(
233233
&self,
234234
expr: &Expr,

datafusion/src/physical_plan/repartition.rs

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ struct RepartitionStream {
447447
/// Number of input partitions that have finished sending batches to this output channel
448448
num_input_partitions_processed: usize,
449449

450-
/// Schema
450+
/// Schema wrapped by Arc
451451
schema: SchemaRef,
452452

453453
/// channel containing the repartitioned batches
@@ -494,6 +494,7 @@ impl RecordBatchStream for RepartitionStream {
494494
mod tests {
495495
use super::*;
496496
use crate::from_slice::FromSlice;
497+
use crate::test::create_vec_batches;
497498
use crate::{
498499
assert_batches_sorted_eq,
499500
physical_plan::{collect, expressions::col, memory::MemoryExec},
@@ -508,7 +509,7 @@ mod tests {
508509
use arrow::datatypes::{DataType, Field, Schema};
509510
use arrow::record_batch::RecordBatch;
510511
use arrow::{
511-
array::{ArrayRef, StringArray, UInt32Array},
512+
array::{ArrayRef, StringArray},
512513
error::ArrowError,
513514
};
514515
use futures::FutureExt;
@@ -601,23 +602,6 @@ mod tests {
601602
Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]))
602603
}
603604

604-
fn create_vec_batches(schema: &Arc<Schema>, n: usize) -> Vec<RecordBatch> {
605-
let batch = create_batch(schema);
606-
let mut vec = Vec::with_capacity(n);
607-
for _ in 0..n {
608-
vec.push(batch.clone());
609-
}
610-
vec
611-
}
612-
613-
fn create_batch(schema: &Arc<Schema>) -> RecordBatch {
614-
RecordBatch::try_new(
615-
schema.clone(),
616-
vec![Arc::new(UInt32Array::from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]))],
617-
)
618-
.unwrap()
619-
}
620-
621605
async fn repartition(
622606
schema: &SchemaRef,
623607
input_partitions: Vec<Vec<RecordBatch>>,

datafusion/src/test/mod.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
//! Common unit test utility methods
1919
20+
use crate::arrow::array::UInt32Array;
2021
use crate::datasource::object_store::local::local_unpartitioned_file;
2122
use crate::datasource::{MemTable, PartitionedFile, TableProvider};
2223
use crate::error::Result;
@@ -212,6 +213,25 @@ pub fn assert_is_pending<'a, T>(fut: &mut Pin<Box<dyn Future<Output = T> + Send
212213
assert!(poll.is_pending());
213214
}
214215

216+
/// Create vector batches
217+
pub fn create_vec_batches(schema: &Arc<Schema>, n: usize) -> Vec<RecordBatch> {
218+
let batch = create_batch(schema);
219+
let mut vec = Vec::with_capacity(n);
220+
for _ in 0..n {
221+
vec.push(batch.clone());
222+
}
223+
vec
224+
}
225+
226+
/// Create batch
227+
fn create_batch(schema: &Arc<Schema>) -> RecordBatch {
228+
RecordBatch::try_new(
229+
schema.clone(),
230+
vec![Arc::new(UInt32Array::from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]))],
231+
)
232+
.unwrap()
233+
}
234+
215235
pub mod exec;
216236
pub mod object_store;
217237
pub mod user_defined;

0 commit comments

Comments
 (0)