Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 9 additions & 17 deletions datafusion-examples/examples/parquet_exec_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::Arc;

use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingOptions;
use datafusion::datasource::physical_plan::{FileGroup, FileScanConfig, ParquetSource};
use datafusion::datasource::physical_plan::{FileGroup, ParquetSource};
use datafusion::datasource::source::DataSourceExec;
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionContext;
Expand Down Expand Up @@ -98,24 +98,16 @@ impl ExecutionPlanVisitor for ParquetExecVisitor {
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
// If needed match on a specific `ExecutionPlan` node type
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
let data_source = data_source_exec.data_source();
if let Some(file_config) =
data_source.as_any().downcast_ref::<FileScanConfig>()
if let Some((file_config, _)) =
data_source_exec.downcast_to_file_source::<ParquetSource>()
{
if file_config
.file_source()
.as_any()
.downcast_ref::<ParquetSource>()
.is_some()
{
self.file_groups = Some(file_config.file_groups.clone());
self.file_groups = Some(file_config.file_groups.clone());

let metrics = match data_source_exec.metrics() {
None => return Ok(true),
Some(metrics) => metrics,
};
self.bytes_scanned = metrics.sum_by_name("bytes_scanned");
}
let metrics = match data_source_exec.metrics() {
None => return Ok(true),
Some(metrics) => metrics,
};
self.bytes_scanned = metrics.sum_by_name("bytes_scanned");
}
}
Ok(true)
Expand Down
15 changes: 4 additions & 11 deletions datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,18 +201,11 @@ impl TestParquetFile {
/// on the first one it finds
pub fn parquet_metrics(plan: &Arc<dyn ExecutionPlan>) -> Option<MetricsSet> {
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
let data_source = data_source_exec.data_source();
if let Some(maybe_parquet) =
data_source.as_any().downcast_ref::<FileScanConfig>()
if data_source_exec
.downcast_to_file_source::<ParquetSource>()
.is_some()
{
if maybe_parquet
.file_source()
.as_any()
.downcast_ref::<ParquetSource>()
.is_some()
{
return data_source_exec.metrics();
}
return data_source_exec.metrics();
}
}

Expand Down
17 changes: 5 additions & 12 deletions datafusion/core/tests/parquet/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Utilities for parquet tests

use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
use datafusion::datasource::physical_plan::ParquetSource;
use datafusion::datasource::source::DataSourceExec;
use datafusion_physical_plan::metrics::MetricsSet;
use datafusion_physical_plan::{accept, ExecutionPlan, ExecutionPlanVisitor};
Expand Down Expand Up @@ -48,18 +48,11 @@ impl ExecutionPlanVisitor for MetricsFinder {
type Error = std::convert::Infallible;
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
let data_source = data_source_exec.data_source();
if let Some(file_config) =
data_source.as_any().downcast_ref::<FileScanConfig>()
if data_source_exec
.downcast_to_file_source::<ParquetSource>()
.is_some()
{
if file_config
.file_source()
.as_any()
.downcast_ref::<ParquetSource>()
.is_some()
{
self.metrics = data_source_exec.metrics();
}
self.metrics = data_source_exec.metrics();
}
}
// stop searching once we have found the metrics
Expand Down
41 changes: 17 additions & 24 deletions datafusion/core/tests/sql/path_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::sync::Arc;

use arrow::datatypes::DataType;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
use datafusion::datasource::physical_plan::ParquetSource;
use datafusion::datasource::source::DataSourceExec;
use datafusion::{
datasource::{
Expand Down Expand Up @@ -87,29 +87,22 @@ async fn parquet_partition_pruning_filter() -> Result<()> {
];
let exec = table.scan(&ctx.state(), None, &filters, None).await?;
let data_source_exec = exec.as_any().downcast_ref::<DataSourceExec>().unwrap();
let data_source = data_source_exec.data_source();
let file_source = data_source
.as_any()
.downcast_ref::<FileScanConfig>()
.unwrap();
let parquet_config = file_source
.file_source()
.as_any()
.downcast_ref::<ParquetSource>()
.unwrap();
let pred = parquet_config.predicate().unwrap();
// Only the last filter should be pushdown to TableScan
let expected = Arc::new(BinaryExpr::new(
Arc::new(Column::new_with_schema("id", &exec.schema()).unwrap()),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
));

assert!(pred.as_any().is::<BinaryExpr>());
let pred = pred.as_any().downcast_ref::<BinaryExpr>().unwrap();

assert_eq!(pred, expected.as_ref());

if let Some((_, parquet_config)) =
data_source_exec.downcast_to_file_source::<ParquetSource>()
{
let pred = parquet_config.predicate().unwrap();
// Only the last filter should be pushdown to TableScan
let expected = Arc::new(BinaryExpr::new(
Arc::new(Column::new_with_schema("id", &exec.schema()).unwrap()),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
));

assert!(pred.as_any().is::<BinaryExpr>());
let pred = pred.as_any().downcast_ref::<BinaryExpr>().unwrap();

assert_eq!(pred, expected.as_ref());
}
Ok(())
}

Expand Down
19 changes: 19 additions & 0 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
};

use crate::file_scan_config::FileScanConfig;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Constraints, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
Expand Down Expand Up @@ -230,4 +231,22 @@ impl DataSourceExec {
Boundedness::Bounded,
)
}

/// Downcast the `DataSourceExec`'s `data_source` to a specific file source
///
/// Returns `None` if
/// 1. the datasource is not scanning files (`FileScanConfig`)
/// 2. The [`FileScanConfig::file_source`] is not of type `T`
pub fn downcast_to_file_source<T: 'static>(&self) -> Option<(&FileScanConfig, &T)> {
self.data_source()
.as_any()
.downcast_ref::<FileScanConfig>()
.and_then(|file_scan_conf| {
file_scan_conf
.file_source()
.as_any()
.downcast_ref::<T>()
.map(|source| (file_scan_conf, source))
})
}
}
44 changes: 20 additions & 24 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1715,31 +1715,27 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {

#[cfg(feature = "parquet")]
if let Some(exec) = plan.downcast_ref::<DataSourceExec>() {
let data_source_exec = exec.data_source();
if let Some(maybe_parquet) =
data_source_exec.as_any().downcast_ref::<FileScanConfig>()
if let Some((maybe_parquet, conf)) =
exec.downcast_to_file_source::<ParquetSource>()
{
let source = maybe_parquet.file_source();
if let Some(conf) = source.as_any().downcast_ref::<ParquetSource>() {
let predicate = conf
.predicate()
.map(|pred| serialize_physical_expr(pred, extension_codec))
.transpose()?;
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::ParquetScan(
protobuf::ParquetScanExecNode {
base_conf: Some(serialize_file_scan_config(
maybe_parquet,
extension_codec,
)?),
predicate,
parquet_options: Some(
conf.table_parquet_options().try_into()?,
),
},
)),
});
}
let predicate = conf
.predicate()
.map(|pred| serialize_physical_expr(pred, extension_codec))
.transpose()?;
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::ParquetScan(
protobuf::ParquetScanExecNode {
base_conf: Some(serialize_file_scan_config(
maybe_parquet,
extension_codec,
)?),
predicate,
parquet_options: Some(
conf.table_parquet_options().try_into()?,
),
},
)),
});
}
}

Expand Down
141 changes: 67 additions & 74 deletions datafusion/substrait/src/physical_plan/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion::datasource::source::DataSourceExec;
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::{displayable, ExecutionPlan};

use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
use datafusion::datasource::physical_plan::ParquetSource;
use substrait::proto::expression::mask_expression::{StructItem, StructSelect};
use substrait::proto::expression::MaskExpression;
use substrait::proto::r#type::{
Expand All @@ -52,89 +52,82 @@ pub fn to_substrait_rel(
),
) -> Result<Box<Rel>> {
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
let data_source = data_source_exec.data_source();
if let Some(file_config) = data_source.as_any().downcast_ref::<FileScanConfig>() {
let is_parquet = file_config
.file_source()
.as_any()
.downcast_ref::<ParquetSource>()
.is_some();
if is_parquet {
let mut substrait_files = vec![];
for (partition_index, files) in file_config.file_groups.iter().enumerate()
{
for file in files.iter() {
substrait_files.push(FileOrFiles {
partition_index: partition_index.try_into().unwrap(),
start: 0,
length: file.object_meta.size as u64,
path_type: Some(PathType::UriPath(
file.object_meta.location.as_ref().to_string(),
)),
file_format: Some(FileFormat::Parquet(ParquetReadOptions {})),
});
}
if let Some((file_config, _)) =
data_source_exec.downcast_to_file_source::<ParquetSource>()
{
let mut substrait_files = vec![];
for (partition_index, files) in file_config.file_groups.iter().enumerate() {
for file in files.iter() {
substrait_files.push(FileOrFiles {
partition_index: partition_index.try_into().unwrap(),
start: 0,
length: file.object_meta.size as u64,
path_type: Some(PathType::UriPath(
file.object_meta.location.as_ref().to_string(),
)),
file_format: Some(FileFormat::Parquet(ParquetReadOptions {})),
});
}
}

let mut names = vec![];
let mut types = vec![];
let mut names = vec![];
let mut types = vec![];

for field in file_config.file_schema.fields.iter() {
match to_substrait_type(field.data_type(), field.is_nullable()) {
Ok(t) => {
names.push(field.name().clone());
types.push(t);
}
Err(e) => return Err(e),
for field in file_config.file_schema.fields.iter() {
match to_substrait_type(field.data_type(), field.is_nullable()) {
Ok(t) => {
names.push(field.name().clone());
types.push(t);
}
Err(e) => return Err(e),
}
}

let type_info = Struct {
types,
// FIXME: duckdb doesn't set this field, keep it as default variant 0.
// https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1106-L1127
type_variation_reference: 0,
nullability: Nullability::Required.into(),
};
let type_info = Struct {
types,
// FIXME: duckdb doesn't set this field, keep it as default variant 0.
// https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1106-L1127
type_variation_reference: 0,
nullability: Nullability::Required.into(),
};

let mut select_struct = None;
if let Some(projection) = file_config.projection.as_ref() {
let struct_items = projection
.iter()
.map(|index| StructItem {
field: *index as i32,
// FIXME: duckdb sets this to None, but it's not clear why.
// https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1191
child: None,
})
.collect();
let mut select_struct = None;
if let Some(projection) = file_config.projection.as_ref() {
let struct_items = projection
.iter()
.map(|index| StructItem {
field: *index as i32,
// FIXME: duckdb sets this to None, but it's not clear why.
// https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1191
child: None,
})
.collect();

select_struct = Some(StructSelect { struct_items });
}
select_struct = Some(StructSelect { struct_items });
}

return Ok(Box::new(Rel {
rel_type: Some(RelType::Read(Box::new(ReadRel {
common: None,
base_schema: Some(NamedStruct {
names,
r#struct: Some(type_info),
}),
filter: None,
best_effort_filter: None,
projection: Some(MaskExpression {
select: select_struct,
// FIXME: duckdb set this to true, but it's not clear why.
// https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1186.
maintain_singular_struct: true,
}),
return Ok(Box::new(Rel {
rel_type: Some(RelType::Read(Box::new(ReadRel {
common: None,
base_schema: Some(NamedStruct {
names,
r#struct: Some(type_info),
}),
filter: None,
best_effort_filter: None,
projection: Some(MaskExpression {
select: select_struct,
// FIXME: duckdb set this to true, but it's not clear why.
// https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1186.
maintain_singular_struct: true,
}),
advanced_extension: None,
read_type: Some(ReadType::LocalFiles(LocalFiles {
items: substrait_files,
advanced_extension: None,
read_type: Some(ReadType::LocalFiles(LocalFiles {
items: substrait_files,
advanced_extension: None,
})),
}))),
}));
}
})),
}))),
}));
}
}
Err(DataFusionError::Substrait(format!(
Expand Down