Skip to content

Commit 89ed225

Browse files
committed
Use FileScanConfig own source
1 parent 54bdbb1 commit 89ed225

File tree

2 files changed

+110
-83
lines changed

2 files changed

+110
-83
lines changed

datafusion/datasource/src/file_scan_config.rs

Lines changed: 68 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ impl DataSource for FileScanConfig {
244244
}
245245

246246
fn statistics(&self) -> Result<Statistics> {
247-
self.file_source.statistics()
247+
Ok(self.projected_stats())
248248
}
249249

250250
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
@@ -324,13 +324,7 @@ impl FileScanConfig {
324324

325325
/// Set the file source
326326
pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
327-
let (
328-
_projected_schema,
329-
_constraints,
330-
projected_statistics,
331-
_projected_output_ordering,
332-
) = self.project();
333-
self.file_source = file_source.with_statistics(projected_statistics);
327+
self.file_source = file_source;
334328
self
335329
}
336330

@@ -346,10 +340,69 @@ impl FileScanConfig {
346340
self
347341
}
348342

343+
fn projection_indices(&self) -> Vec<usize> {
344+
match &self.projection {
345+
Some(proj) => proj.clone(),
346+
None => (0..self.file_schema.fields().len()
347+
+ self.table_partition_cols.len())
348+
.collect(),
349+
}
350+
}
351+
352+
fn projected_stats(&self) -> Statistics {
353+
let table_cols_stats = self
354+
.projection_indices()
355+
.into_iter()
356+
.map(|idx| {
357+
if idx < self.file_schema.fields().len() {
358+
self.statistics.column_statistics[idx].clone()
359+
} else {
360+
// TODO provide accurate stat for partition column (#1186)
361+
ColumnStatistics::new_unknown()
362+
}
363+
})
364+
.collect();
365+
366+
Statistics {
367+
num_rows: self.statistics.num_rows,
368+
// TODO correct byte size?
369+
total_byte_size: Precision::Absent,
370+
column_statistics: table_cols_stats,
371+
}
372+
}
373+
374+
fn projected_schema(&self) -> Arc<Schema> {
375+
let table_fields: Vec<_> = self
376+
.projection_indices()
377+
.into_iter()
378+
.map(|idx| {
379+
if idx < self.file_schema.fields().len() {
380+
self.file_schema.field(idx).clone()
381+
} else {
382+
let partition_idx = idx - self.file_schema.fields().len();
383+
self.table_partition_cols[partition_idx].clone()
384+
}
385+
})
386+
.collect();
387+
388+
Arc::new(Schema::new_with_metadata(
389+
table_fields,
390+
self.file_schema.metadata().clone(),
391+
))
392+
}
393+
394+
fn projected_constraints(&self) -> Constraints {
395+
let indexes = self.projection_indices();
396+
397+
self.constraints
398+
.project(&indexes)
399+
.unwrap_or_else(Constraints::empty)
400+
}
401+
349402
/// Set the projection of the files
350403
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
351404
self.projection = projection;
352-
self.with_updated_statistics()
405+
self
353406
}
354407

355408
/// Set the limit of the files
@@ -358,32 +411,6 @@ impl FileScanConfig {
358411
self
359412
}
360413

361-
// Update source statistics with the current projection data
362-
fn with_updated_statistics(mut self) -> Self {
363-
let max_projection_column = *self
364-
.projection
365-
.as_ref()
366-
.and_then(|proj| proj.iter().max())
367-
.unwrap_or(&0);
368-
369-
if max_projection_column
370-
>= self.file_schema.fields().len() + self.table_partition_cols.len()
371-
{
372-
// we don't yet have enough information (file schema info or partition column info) to perform projection
373-
return self;
374-
}
375-
376-
let (
377-
_projected_schema,
378-
_constraints,
379-
projected_statistics,
380-
_projected_output_ordering,
381-
) = self.project();
382-
383-
self.file_source = self.file_source.with_statistics(projected_statistics);
384-
self
385-
}
386-
387414
/// Add a file as a single group
388415
///
389416
/// See [Self::file_groups] for more information.
@@ -413,7 +440,7 @@ impl FileScanConfig {
413440
/// Set the partitioning columns of the files
414441
pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
415442
self.table_partition_cols = table_partition_cols;
416-
self.with_updated_statistics()
443+
self
417444
}
418445

419446
/// Set the output ordering of the files
@@ -459,54 +486,13 @@ impl FileScanConfig {
459486
);
460487
}
461488

462-
let proj_indices = if let Some(proj) = &self.projection {
463-
proj
464-
} else {
465-
let len = self.file_schema.fields().len() + self.table_partition_cols.len();
466-
&(0..len).collect::<Vec<_>>()
467-
};
468-
469-
let mut table_fields = vec![];
470-
let mut table_cols_stats = vec![];
471-
for idx in proj_indices {
472-
if *idx < self.file_schema.fields().len() {
473-
let field = self.file_schema.field(*idx);
474-
table_fields.push(field.clone());
475-
table_cols_stats.push(self.statistics.column_statistics[*idx].clone())
476-
} else {
477-
let partition_idx = idx - self.file_schema.fields().len();
478-
table_fields.push(self.table_partition_cols[partition_idx].to_owned());
479-
// TODO provide accurate stat for partition column (#1186)
480-
table_cols_stats.push(ColumnStatistics::new_unknown())
481-
}
482-
}
483-
484-
let table_stats = Statistics {
485-
num_rows: self.statistics.num_rows,
486-
// TODO correct byte size?
487-
total_byte_size: Precision::Absent,
488-
column_statistics: table_cols_stats,
489-
};
490-
491-
let projected_schema = Arc::new(Schema::new_with_metadata(
492-
table_fields,
493-
self.file_schema.metadata().clone(),
494-
));
495-
496-
let projected_constraints = self
497-
.constraints
498-
.project(proj_indices)
499-
.unwrap_or_else(Constraints::empty);
489+
let schema = self.projected_schema();
490+
let constraints = self.projected_constraints();
491+
let stats = self.projected_stats();
500492

501-
let projected_output_ordering =
502-
get_projected_output_ordering(self, &projected_schema);
493+
let output_ordering = get_projected_output_ordering(self, &schema);
503494

504-
(
505-
projected_schema,
506-
projected_constraints,
507-
table_stats,
508-
projected_output_ordering,
509-
)
495+
(schema, constraints, stats, output_ordering)
510496
}
511497

512498
#[cfg_attr(not(feature = "avro"), allow(unused))] // Only used by avro

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
4848
use datafusion::datasource::object_store::ObjectStoreUrl;
4949
use datafusion::datasource::physical_plan::{
5050
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
51-
FileSinkConfig, ParquetSource,
51+
FileSinkConfig, FileSource, ParquetSource,
5252
};
5353
use datafusion::execution::FunctionRegistry;
5454
use datafusion::functions_aggregate::sum::sum_udaf;
@@ -1579,3 +1579,44 @@ async fn roundtrip_coalesce() -> Result<()> {
15791579

15801580
Ok(())
15811581
}
1582+
1583+
#[tokio::test]
1584+
async fn roundtrip_projection_source() -> Result<()> {
1585+
let schema = Arc::new(Schema::new(Fields::from([
1586+
Arc::new(Field::new("a", DataType::Utf8, false)),
1587+
Arc::new(Field::new("b", DataType::Utf8, false)),
1588+
Arc::new(Field::new("c", DataType::Int32, false)),
1589+
Arc::new(Field::new("d", DataType::Int32, false)),
1590+
])));
1591+
1592+
let statistics = Statistics::new_unknown(&schema);
1593+
1594+
let source = ParquetSource::default().with_statistics(statistics.clone());
1595+
let scan_config = FileScanConfig {
1596+
object_store_url: ObjectStoreUrl::local_filesystem(),
1597+
file_groups: vec![vec![PartitionedFile::new(
1598+
"/path/to/file.parquet".to_string(),
1599+
1024,
1600+
)]],
1601+
constraints: Constraints::empty(),
1602+
statistics,
1603+
file_schema: schema.clone(),
1604+
projection: Some(vec![0, 1, 2]),
1605+
limit: None,
1606+
table_partition_cols: vec![],
1607+
output_ordering: vec![],
1608+
file_compression_type: FileCompressionType::UNCOMPRESSED,
1609+
new_lines_in_values: false,
1610+
file_source: source,
1611+
};
1612+
1613+
let filter = Arc::new(
1614+
FilterExec::try_new(
1615+
Arc::new(BinaryExpr::new(col("c", &schema)?, Operator::Eq, lit(1))),
1616+
scan_config.build(),
1617+
)?
1618+
.with_projection(Some(vec![0, 1]))?,
1619+
);
1620+
1621+
roundtrip_test(filter)
1622+
}

0 commit comments

Comments
 (0)