Skip to content

Commit 4ef66aa

Browse files
committed
[fix] adding extra partitioning col for csv avro json
parquet exec still TODO
1 parent 3913979 commit 4ef66aa

File tree

32 files changed

+608
-454
lines changed

32 files changed

+608
-454
lines changed

ballista/rust/core/proto/ballista.proto

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ message ListingTableScanNode {
294294
ProjectionColumns projection = 4;
295295
Schema schema = 5;
296296
repeated LogicalExprNode filters = 6;
297-
repeated string partitions = 7;
297+
repeated string table_partition_dims = 7;
298298
bool collect_stat = 8;
299299
uint32 target_partitions = 9;
300300
oneof FileFormatType {
@@ -613,6 +613,7 @@ message ParquetScanExecNode {
613613
repeated uint32 projection = 6;
614614
ScanLimit limit = 7;
615615
Statistics statistics = 8;
616+
repeated string table_partition_dims = 9;
616617
}
617618

618619
message CsvScanExecNode {
@@ -624,6 +625,7 @@ message CsvScanExecNode {
624625
repeated uint32 projection = 6;
625626
ScanLimit limit = 7;
626627
Statistics statistics = 8;
628+
repeated string table_partition_dims = 9;
627629
}
628630

629631
message AvroScanExecNode {
@@ -633,6 +635,7 @@ message AvroScanExecNode {
633635
repeated uint32 projection = 6;
634636
ScanLimit limit = 7;
635637
Statistics statistics = 8;
638+
repeated string table_partition_dims = 9;
636639
}
637640

638641
enum PartitionMode {

ballista/rust/core/src/serde/logical_plan/from_proto.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
166166
let options = ListingOptions {
167167
file_extension: scan.file_extension.clone(),
168168
format: file_format,
169-
partitions: scan.partitions.clone(),
169+
table_partition_dims: scan.table_partition_dims.clone(),
170170
collect_stat: scan.collect_stat,
171171
target_partitions: scan.target_partitions as usize,
172172
};

ballista/rust/core/src/serde/logical_plan/to_proto.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -735,7 +735,10 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
735735
.options()
736736
.file_extension
737737
.clone(),
738-
partitions: listing_table.options().partitions.clone(),
738+
table_partition_dims: listing_table
739+
.options()
740+
.table_partition_dims
741+
.clone(),
739742
path: listing_table.table_path().to_owned(),
740743
schema: Some(schema),
741744
projection,

ballista/rust/core/src/serde/physical_plan/from_proto.rs

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
3535
use datafusion::catalog::catalog::{
3636
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
3737
};
38+
use datafusion::datasource::file_format::PhysicalPlanConfig;
3839
use datafusion::datasource::object_store::local::LocalFileSystem;
3940
use datafusion::datasource::object_store::{FileMeta, ObjectStoreRegistry, SizedFile};
4041
use datafusion::datasource::PartitionedFile;
@@ -124,18 +125,22 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
124125
let statistics = convert_required!(scan.statistics)?;
125126

126127
Ok(Arc::new(CsvExec::new(
127-
Arc::new(LocalFileSystem {}),
128-
scan.file_groups
129-
.iter()
130-
.map(|f| f.try_into())
131-
.collect::<Result<Vec<_>, _>>()?,
132-
statistics,
133-
schema,
128+
PhysicalPlanConfig {
129+
object_store: Arc::new(LocalFileSystem {}),
130+
file_schema: schema,
131+
file_groups: scan
132+
.file_groups
133+
.iter()
134+
.map(|f| f.try_into())
135+
.collect::<Result<Vec<_>, _>>()?,
136+
statistics,
137+
projection: Some(projection),
138+
batch_size: scan.batch_size as usize,
139+
limit: scan.limit.as_ref().map(|sl| sl.limit as usize),
140+
table_partition_dims: vec![],
141+
},
134142
scan.has_header,
135143
str_to_byte(&scan.delimiter)?,
136-
Some(projection),
137-
scan.batch_size as usize,
138-
scan.limit.as_ref().map(|sl| sl.limit as usize),
139144
)))
140145
}
141146
PhysicalPlanType::ParquetScan(scan) => {
@@ -144,37 +149,43 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
144149
let statistics = convert_required!(scan.statistics)?;
145150

146151
Ok(Arc::new(ParquetExec::new(
147-
Arc::new(LocalFileSystem {}),
148-
scan.file_groups
149-
.iter()
150-
.map(|p| p.try_into())
151-
.collect::<Result<Vec<_>, _>>()?,
152-
statistics,
153-
schema,
154-
Some(projection),
152+
PhysicalPlanConfig {
153+
object_store: Arc::new(LocalFileSystem {}),
154+
file_schema: schema,
155+
file_groups: scan
156+
.file_groups
157+
.iter()
158+
.map(|f| f.try_into())
159+
.collect::<Result<Vec<_>, _>>()?,
160+
statistics,
161+
projection: Some(projection),
162+
batch_size: scan.batch_size as usize,
163+
limit: scan.limit.as_ref().map(|sl| sl.limit as usize),
164+
table_partition_dims: scan.table_partition_dims.clone(),
165+
},
155166
// TODO predicate should be de-serialized
156167
None,
157-
scan.batch_size as usize,
158-
scan.limit.as_ref().map(|sl| sl.limit as usize),
159168
)))
160169
}
161170
PhysicalPlanType::AvroScan(scan) => {
162171
let schema = Arc::new(convert_required!(scan.schema)?);
163172
let projection = scan.projection.iter().map(|i| *i as usize).collect();
164173
let statistics = convert_required!(scan.statistics)?;
165174

166-
Ok(Arc::new(AvroExec::new(
167-
Arc::new(LocalFileSystem {}),
168-
scan.file_groups
175+
Ok(Arc::new(AvroExec::new(PhysicalPlanConfig {
176+
object_store: Arc::new(LocalFileSystem {}),
177+
file_schema: schema,
178+
file_groups: scan
179+
.file_groups
169180
.iter()
170181
.map(|f| f.try_into())
171182
.collect::<Result<Vec<_>, _>>()?,
172183
statistics,
173-
schema,
174-
Some(projection),
175-
scan.batch_size as usize,
176-
scan.limit.as_ref().map(|sl| sl.limit as usize),
177-
)))
184+
projection: Some(projection),
185+
batch_size: scan.batch_size as usize,
186+
limit: scan.limit.as_ref().map(|sl| sl.limit as usize),
187+
table_partition_dims: vec![],
188+
})))
178189
}
179190
PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
180191
let input: Arc<dyn ExecutionPlan> =

ballista/rust/core/src/serde/physical_plan/to_proto.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
272272
has_header: exec.has_header(),
273273
delimiter: byte_to_string(exec.delimiter())?,
274274
batch_size: exec.batch_size() as u32,
275+
table_partition_dims: exec.table_partition_dims().to_vec(),
275276
},
276277
)),
277278
})
@@ -298,6 +299,7 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
298299
.map(|n| *n as u32)
299300
.collect(),
300301
batch_size: exec.batch_size() as u32,
302+
table_partition_dims: exec.table_partition_dims().to_vec(),
301303
},
302304
)),
303305
})
@@ -328,6 +330,7 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
328330
.collect(),
329331
schema: Some(exec.file_schema().as_ref().into()),
330332
batch_size: exec.batch_size() as u32,
333+
table_partition_dims: exec.table_partition_dims().to_vec(),
331334
},
332335
)),
333336
})

benchmarks/src/bin/tpch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,7 @@ fn get_table(
496496
file_extension: extension.to_owned(),
497497
target_partitions,
498498
collect_stat: true,
499-
partitions: vec![],
499+
table_partition_dims: vec![],
500500
};
501501

502502
Ok(Arc::new(ListingTable::new(

datafusion/src/datasource/file_format/avro.rs

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use super::{FileFormat, PhysicalPlanConfig};
2929
use crate::avro_to_arrow::read_avro_schema_from_reader;
3030
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
3131
use crate::error::Result;
32+
use crate::logical_plan::Expr;
3233
use crate::physical_plan::file_format::AvroExec;
3334
use crate::physical_plan::ExecutionPlan;
3435
use crate::physical_plan::Statistics;
@@ -61,16 +62,9 @@ impl FileFormat for AvroFormat {
6162
async fn create_physical_plan(
6263
&self,
6364
conf: PhysicalPlanConfig,
65+
_filters: &[Expr],
6466
) -> Result<Arc<dyn ExecutionPlan>> {
65-
let exec = AvroExec::new(
66-
conf.object_store,
67-
conf.files,
68-
conf.statistics,
69-
conf.schema,
70-
conf.projection,
71-
conf.batch_size,
72-
conf.limit,
73-
);
67+
let exec = AvroExec::new(conf);
7468
Ok(Arc::new(exec))
7569
}
7670
}
@@ -346,26 +340,29 @@ mod tests {
346340
let testdata = crate::test_util::arrow_test_data();
347341
let filename = format!("{}/avro/{}", testdata, file_name);
348342
let format = AvroFormat {};
349-
let schema = format
343+
let file_schema = format
350344
.infer_schema(local_object_reader_stream(vec![filename.clone()]))
351345
.await
352346
.expect("Schema inference");
353347
let statistics = format
354348
.infer_stats(local_object_reader(filename.clone()))
355349
.await
356350
.expect("Stats inference");
357-
let files = vec![vec![local_unpartitioned_file(filename.to_owned())]];
351+
let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];
358352
let exec = format
359-
.create_physical_plan(PhysicalPlanConfig {
360-
object_store: Arc::new(LocalFileSystem {}),
361-
schema,
362-
files,
363-
statistics,
364-
projection: projection.clone(),
365-
batch_size,
366-
filters: vec![],
367-
limit,
368-
})
353+
.create_physical_plan(
354+
PhysicalPlanConfig {
355+
object_store: Arc::new(LocalFileSystem {}),
356+
file_schema,
357+
file_groups,
358+
statistics,
359+
projection: projection.clone(),
360+
batch_size,
361+
limit,
362+
table_partition_dims: vec![],
363+
},
364+
&[],
365+
)
369366
.await?;
370367
Ok(exec)
371368
}

datafusion/src/datasource/file_format/csv.rs

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use futures::StreamExt;
2828
use super::{FileFormat, PhysicalPlanConfig};
2929
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
3030
use crate::error::Result;
31+
use crate::logical_plan::Expr;
3132
use crate::physical_plan::file_format::CsvExec;
3233
use crate::physical_plan::ExecutionPlan;
3334
use crate::physical_plan::Statistics;
@@ -123,18 +124,9 @@ impl FileFormat for CsvFormat {
123124
async fn create_physical_plan(
124125
&self,
125126
conf: PhysicalPlanConfig,
127+
_filters: &[Expr],
126128
) -> Result<Arc<dyn ExecutionPlan>> {
127-
let exec = CsvExec::new(
128-
conf.object_store,
129-
conf.files,
130-
conf.statistics,
131-
conf.schema,
132-
self.has_header,
133-
self.delimiter,
134-
conf.projection,
135-
conf.batch_size,
136-
conf.limit,
137-
);
129+
let exec = CsvExec::new(conf, self.has_header, self.delimiter);
138130
Ok(Arc::new(exec))
139131
}
140132
}
@@ -260,26 +252,29 @@ mod tests {
260252
let testdata = crate::test_util::arrow_test_data();
261253
let filename = format!("{}/csv/{}", testdata, file_name);
262254
let format = CsvFormat::default();
263-
let schema = format
255+
let file_schema = format
264256
.infer_schema(local_object_reader_stream(vec![filename.clone()]))
265257
.await
266258
.expect("Schema inference");
267259
let statistics = format
268260
.infer_stats(local_object_reader(filename.clone()))
269261
.await
270262
.expect("Stats inference");
271-
let files = vec![vec![local_unpartitioned_file(filename.to_owned())]];
263+
let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];
272264
let exec = format
273-
.create_physical_plan(PhysicalPlanConfig {
274-
object_store: Arc::new(LocalFileSystem {}),
275-
schema,
276-
files,
277-
statistics,
278-
projection: projection.clone(),
279-
batch_size,
280-
filters: vec![],
281-
limit,
282-
})
265+
.create_physical_plan(
266+
PhysicalPlanConfig {
267+
object_store: Arc::new(LocalFileSystem {}),
268+
file_schema,
269+
file_groups,
270+
statistics,
271+
projection: projection.clone(),
272+
batch_size,
273+
limit,
274+
table_partition_dims: vec![],
275+
},
276+
&[],
277+
)
283278
.await?;
284279
Ok(exec)
285280
}

datafusion/src/datasource/file_format/json.rs

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use super::FileFormat;
3232
use super::PhysicalPlanConfig;
3333
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
3434
use crate::error::Result;
35+
use crate::logical_plan::Expr;
3536
use crate::physical_plan::file_format::NdJsonExec;
3637
use crate::physical_plan::ExecutionPlan;
3738
use crate::physical_plan::Statistics;
@@ -93,16 +94,9 @@ impl FileFormat for JsonFormat {
9394
async fn create_physical_plan(
9495
&self,
9596
conf: PhysicalPlanConfig,
97+
_filters: &[Expr],
9698
) -> Result<Arc<dyn ExecutionPlan>> {
97-
let exec = NdJsonExec::new(
98-
conf.object_store,
99-
conf.files,
100-
conf.statistics,
101-
conf.schema,
102-
conf.projection,
103-
conf.batch_size,
104-
conf.limit,
105-
);
99+
let exec = NdJsonExec::new(conf);
106100
Ok(Arc::new(exec))
107101
}
108102
}
@@ -211,26 +205,29 @@ mod tests {
211205
) -> Result<Arc<dyn ExecutionPlan>> {
212206
let filename = "tests/jsons/2.json";
213207
let format = JsonFormat::default();
214-
let schema = format
208+
let file_schema = format
215209
.infer_schema(local_object_reader_stream(vec![filename.to_owned()]))
216210
.await
217211
.expect("Schema inference");
218212
let statistics = format
219213
.infer_stats(local_object_reader(filename.to_owned()))
220214
.await
221215
.expect("Stats inference");
222-
let files = vec![vec![local_unpartitioned_file(filename.to_owned())]];
216+
let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];
223217
let exec = format
224-
.create_physical_plan(PhysicalPlanConfig {
225-
object_store: Arc::new(LocalFileSystem {}),
226-
schema,
227-
files,
228-
statistics,
229-
projection: projection.clone(),
230-
batch_size,
231-
filters: vec![],
232-
limit,
233-
})
218+
.create_physical_plan(
219+
PhysicalPlanConfig {
220+
object_store: Arc::new(LocalFileSystem {}),
221+
file_schema,
222+
file_groups,
223+
statistics,
224+
projection: projection.clone(),
225+
batch_size,
226+
limit,
227+
table_partition_dims: vec![],
228+
},
229+
&[],
230+
)
234231
.await?;
235232
Ok(exec)
236233
}

0 commit comments

Comments
 (0)