From e86b5d85b7a4f92ded2002010bf35cd0c00aa811 Mon Sep 17 00:00:00 2001 From: blaginin Date: Mon, 3 Feb 2025 22:30:03 +0000 Subject: [PATCH 1/2] Serialize `parquet_options` --- datafusion/proto/proto/datafusion.proto | 2 + datafusion/proto/src/generated/pbjson.rs | 18 ++++ datafusion/proto/src/generated/prost.rs | 94 +++++++++---------- datafusion/proto/src/physical_plan/mod.rs | 6 ++ .../tests/cases/roundtrip_physical_plan.rs | 5 + 5 files changed, 77 insertions(+), 48 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 57c2f5f51bbe..3bc884257dab 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1005,6 +1005,8 @@ message ParquetScanExecNode { reserved 2; PhysicalExprNode predicate = 3; + + datafusion_common.TableParquetOptions parquet_options = 4; } message CsvScanExecNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 0cf893cbc534..add72e4f777e 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -12129,6 +12129,9 @@ impl serde::Serialize for ParquetScanExecNode { if self.predicate.is_some() { len += 1; } + if self.parquet_options.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.ParquetScanExecNode", len)?; if let Some(v) = self.base_conf.as_ref() { struct_ser.serialize_field("baseConf", v)?; @@ -12136,6 +12139,9 @@ impl serde::Serialize for ParquetScanExecNode { if let Some(v) = self.predicate.as_ref() { struct_ser.serialize_field("predicate", v)?; } + if let Some(v) = self.parquet_options.as_ref() { + struct_ser.serialize_field("parquetOptions", v)?; + } struct_ser.end() } } @@ -12149,12 +12155,15 @@ impl<'de> serde::Deserialize<'de> for ParquetScanExecNode { "base_conf", "baseConf", "predicate", + "parquet_options", + "parquetOptions", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { BaseConf, Predicate, + ParquetOptions, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -12178,6 +12187,7 @@ impl<'de> serde::Deserialize<'de> for ParquetScanExecNode { match value { "baseConf" | "base_conf" => Ok(GeneratedField::BaseConf), "predicate" => Ok(GeneratedField::Predicate), + "parquetOptions" | "parquet_options" => Ok(GeneratedField::ParquetOptions), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -12199,6 +12209,7 @@ impl<'de> serde::Deserialize<'de> for ParquetScanExecNode { { let mut base_conf__ = None; let mut predicate__ = None; + let mut parquet_options__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::BaseConf => { @@ -12213,11 +12224,18 @@ impl<'de> serde::Deserialize<'de> for ParquetScanExecNode { } predicate__ = map_.next_value()?; } + GeneratedField::ParquetOptions => { + if parquet_options__.is_some() { + return Err(serde::de::Error::duplicate_field("parquetOptions")); + } + parquet_options__ = map_.next_value()?; + } } } Ok(ParquetScanExecNode { base_conf: base_conf__, predicate: predicate__, + parquet_options: parquet_options__, }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 26efb617e067..d8eaa2b251fa 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -123,10 +123,11 @@ pub struct ListingTableScanNode { pub target_partitions: u32, #[prost(message, repeated, tag = "13")] pub file_sort_order: ::prost::alloc::vec::Vec, - #[prost(oneof = "listing_table_scan_node::FileFormatType", tags = "10, 11, 12, 15")] - pub file_format_type: ::core::option::Option< - listing_table_scan_node::FileFormatType, - >, + #[prost( + oneof = "listing_table_scan_node::FileFormatType", + tags = "10, 11, 12, 15" + )] + pub file_format_type: ::core::option::Option, } /// Nested message and enum types in `ListingTableScanNode`. pub mod listing_table_scan_node { @@ -262,10 +263,8 @@ pub struct CreateExternalTableNode { #[prost(message, optional, tag = "12")] pub constraints: ::core::option::Option, #[prost(map = "string, message", tag = "13")] - pub column_defaults: ::std::collections::HashMap< - ::prost::alloc::string::String, - LogicalExprNode, - >, + pub column_defaults: + ::std::collections::HashMap<::prost::alloc::string::String, LogicalExprNode>, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct PrepareNode { @@ -415,15 +414,7 @@ pub struct DmlNode { /// Nested message and enum types in `DmlNode`. pub mod dml_node { #[derive( - Clone, - Copy, - Debug, - PartialEq, - Eq, - Hash, - PartialOrd, - Ord, - ::prost::Enumeration + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, )] #[repr(i32)] pub enum Type { @@ -1022,9 +1013,7 @@ pub struct FullTableReference { #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableReference { #[prost(oneof = "table_reference::TableReferenceEnum", tags = "1, 2, 3")] - pub table_reference_enum: ::core::option::Option< - table_reference::TableReferenceEnum, - >, + pub table_reference_enum: ::core::option::Option, } /// Nested message and enum types in `TableReference`. pub mod table_reference { @@ -1144,9 +1133,8 @@ pub struct JsonSink { #[prost(message, optional, tag = "1")] pub config: ::core::option::Option, #[prost(message, optional, tag = "2")] - pub writer_options: ::core::option::Option< - super::datafusion_common::JsonWriterOptions, - >, + pub writer_options: + ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct JsonSinkExecNode { @@ -1164,9 +1152,8 @@ pub struct CsvSink { #[prost(message, optional, tag = "1")] pub config: ::core::option::Option, #[prost(message, optional, tag = "2")] - pub writer_options: ::core::option::Option< - super::datafusion_common::CsvWriterOptions, - >, + pub writer_options: + ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct CsvSinkExecNode { @@ -1184,9 +1171,8 @@ pub struct ParquetSink { #[prost(message, optional, tag = "1")] pub config: ::core::option::Option, #[prost(message, optional, tag = "2")] - pub parquet_options: ::core::option::Option< - super::datafusion_common::TableParquetOptions, - >, + pub parquet_options: + ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct ParquetSinkExecNode { @@ -1310,9 +1296,8 @@ pub struct PhysicalAggregateExprNode { #[prost(bytes = "vec", optional, tag = "7")] pub fun_definition: ::core::option::Option<::prost::alloc::vec::Vec>, #[prost(oneof = "physical_aggregate_expr_node::AggregateFunction", tags = "4")] - pub aggregate_function: ::core::option::Option< - physical_aggregate_expr_node::AggregateFunction, - >, + pub aggregate_function: + ::core::option::Option, } /// Nested message and enum types in `PhysicalAggregateExprNode`. pub mod physical_aggregate_expr_node { @@ -1337,9 +1322,8 @@ pub struct PhysicalWindowExprNode { #[prost(bytes = "vec", optional, tag = "9")] pub fun_definition: ::core::option::Option<::prost::alloc::vec::Vec>, #[prost(oneof = "physical_window_expr_node::WindowFunction", tags = "3, 10")] - pub window_function: ::core::option::Option< - physical_window_expr_node::WindowFunction, - >, + pub window_function: + ::core::option::Option, } /// Nested message and enum types in `PhysicalWindowExprNode`. pub mod physical_window_expr_node { @@ -1517,6 +1501,9 @@ pub struct ParquetScanExecNode { pub base_conf: ::core::option::Option, #[prost(message, optional, tag = "3")] pub predicate: ::core::option::Option, + #[prost(message, optional, tag = "4")] + pub parquet_options: + ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct CsvScanExecNode { @@ -1868,9 +1855,7 @@ pub struct PartitionedFile { #[prost(uint64, tag = "3")] pub last_modified_ns: u64, #[prost(message, repeated, tag = "4")] - pub partition_values: ::prost::alloc::vec::Vec< - super::datafusion_common::ScalarValue, - >, + pub partition_values: ::prost::alloc::vec::Vec, #[prost(message, optional, tag = "5")] pub range: ::core::option::Option, #[prost(message, optional, tag = "6")] @@ -1901,9 +1886,8 @@ pub struct RecursiveQueryNode { #[prost(message, optional, boxed, tag = "2")] pub static_term: ::core::option::Option<::prost::alloc::boxed::Box>, #[prost(message, optional, boxed, tag = "3")] - pub recursive_term: ::core::option::Option< - ::prost::alloc::boxed::Box, - >, + pub recursive_term: + ::core::option::Option<::prost::alloc::boxed::Box>, #[prost(bool, tag = "4")] pub is_distinct: bool, } @@ -1914,7 +1898,9 @@ pub struct CteWorkTableScanNode { #[prost(message, optional, tag = "2")] pub schema: ::core::option::Option, } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum WindowFrameUnits { Rows = 0, @@ -1943,7 +1929,9 @@ impl WindowFrameUnits { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum WindowFrameBoundType { CurrentRow = 0, @@ -1972,7 +1960,9 @@ impl WindowFrameBoundType { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum DateUnit { Day = 0, @@ -1998,7 +1988,9 @@ impl DateUnit { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum InsertOp { Append = 0, @@ -2027,7 +2019,9 @@ impl InsertOp { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum PartitionMode { CollectLeft = 0, @@ -2056,7 +2050,9 @@ impl PartitionMode { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum StreamPartitionMode { SinglePartition = 0, @@ -2082,7 +2078,9 @@ impl StreamPartitionMode { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum AggregateMode { Partial = 0, diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index a266d55b46df..9ded9122b7da 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -258,6 +258,11 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { }) .transpose()?; let mut builder = ParquetExec::builder(base_config); + + if let Some(options) = scan.parquet_options.as_ref() { + builder = builder.with_table_parquet_options(options.try_into()?) + } + if let Some(predicate) = predicate { builder = builder.with_predicate(predicate) } @@ -1654,6 +1659,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { extension_codec, )?), predicate, + parquet_options: Some(exec.table_parquet_options().try_into()?), }, )), }); diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 7c34fe068024..50c08024464f 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -730,9 +730,14 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> { Operator::Eq, lit("1"), )); + + let mut options = TableParquetOptions::new(); + options.global.pushdown_filters = true; + roundtrip_test( ParquetExec::builder(scan_config) .with_predicate(predicate) + .with_table_parquet_options(options) .build_arc(), ) } From 42bea3bdb7652b777992c34a8b9de7c5e7595b26 Mon Sep 17 00:00:00 2001 From: blaginin Date: Mon, 3 Feb 2025 22:36:18 +0000 Subject: [PATCH 2/2] Fix format --- datafusion/proto/src/generated/prost.rs | 96 +++++++++++++------------ 1 file changed, 51 insertions(+), 45 deletions(-) diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index d8eaa2b251fa..df32c1a70d61 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -123,11 +123,10 @@ pub struct ListingTableScanNode { pub target_partitions: u32, #[prost(message, repeated, tag = "13")] pub file_sort_order: ::prost::alloc::vec::Vec, - #[prost( - oneof = "listing_table_scan_node::FileFormatType", - tags = "10, 11, 12, 15" - )] - pub file_format_type: ::core::option::Option, + #[prost(oneof = "listing_table_scan_node::FileFormatType", tags = "10, 11, 12, 15")] + pub file_format_type: ::core::option::Option< + listing_table_scan_node::FileFormatType, + >, } /// Nested message and enum types in `ListingTableScanNode`. pub mod listing_table_scan_node { @@ -263,8 +262,10 @@ pub struct CreateExternalTableNode { #[prost(message, optional, tag = "12")] pub constraints: ::core::option::Option, #[prost(map = "string, message", tag = "13")] - pub column_defaults: - ::std::collections::HashMap<::prost::alloc::string::String, LogicalExprNode>, + pub column_defaults: ::std::collections::HashMap< + ::prost::alloc::string::String, + LogicalExprNode, + >, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct PrepareNode { @@ -414,7 +415,15 @@ pub struct DmlNode { /// Nested message and enum types in `DmlNode`. pub mod dml_node { #[derive( - Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration )] #[repr(i32)] pub enum Type { @@ -1013,7 +1022,9 @@ pub struct FullTableReference { #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableReference { #[prost(oneof = "table_reference::TableReferenceEnum", tags = "1, 2, 3")] - pub table_reference_enum: ::core::option::Option, + pub table_reference_enum: ::core::option::Option< + table_reference::TableReferenceEnum, + >, } /// Nested message and enum types in `TableReference`. pub mod table_reference { @@ -1133,8 +1144,9 @@ pub struct JsonSink { #[prost(message, optional, tag = "1")] pub config: ::core::option::Option, #[prost(message, optional, tag = "2")] - pub writer_options: - ::core::option::Option, + pub writer_options: ::core::option::Option< + super::datafusion_common::JsonWriterOptions, + >, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct JsonSinkExecNode { @@ -1152,8 +1164,9 @@ pub struct CsvSink { #[prost(message, optional, tag = "1")] pub config: ::core::option::Option, #[prost(message, optional, tag = "2")] - pub writer_options: - ::core::option::Option, + pub writer_options: ::core::option::Option< + super::datafusion_common::CsvWriterOptions, + >, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct CsvSinkExecNode { @@ -1171,8 +1184,9 @@ pub struct ParquetSink { #[prost(message, optional, tag = "1")] pub config: ::core::option::Option, #[prost(message, optional, tag = "2")] - pub parquet_options: - ::core::option::Option, + pub parquet_options: ::core::option::Option< + super::datafusion_common::TableParquetOptions, + >, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct ParquetSinkExecNode { @@ -1296,8 +1310,9 @@ pub struct PhysicalAggregateExprNode { #[prost(bytes = "vec", optional, tag = "7")] pub fun_definition: ::core::option::Option<::prost::alloc::vec::Vec>, #[prost(oneof = "physical_aggregate_expr_node::AggregateFunction", tags = "4")] - pub aggregate_function: - ::core::option::Option, + pub aggregate_function: ::core::option::Option< + physical_aggregate_expr_node::AggregateFunction, + >, } /// Nested message and enum types in `PhysicalAggregateExprNode`. pub mod physical_aggregate_expr_node { @@ -1322,8 +1337,9 @@ pub struct PhysicalWindowExprNode { #[prost(bytes = "vec", optional, tag = "9")] pub fun_definition: ::core::option::Option<::prost::alloc::vec::Vec>, #[prost(oneof = "physical_window_expr_node::WindowFunction", tags = "3, 10")] - pub window_function: - ::core::option::Option, + pub window_function: ::core::option::Option< + physical_window_expr_node::WindowFunction, + >, } /// Nested message and enum types in `PhysicalWindowExprNode`. pub mod physical_window_expr_node { @@ -1502,8 +1518,9 @@ pub struct ParquetScanExecNode { #[prost(message, optional, tag = "3")] pub predicate: ::core::option::Option, #[prost(message, optional, tag = "4")] - pub parquet_options: - ::core::option::Option, + pub parquet_options: ::core::option::Option< + super::datafusion_common::TableParquetOptions, + >, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct CsvScanExecNode { @@ -1855,7 +1872,9 @@ pub struct PartitionedFile { #[prost(uint64, tag = "3")] pub last_modified_ns: u64, #[prost(message, repeated, tag = "4")] - pub partition_values: ::prost::alloc::vec::Vec, + pub partition_values: ::prost::alloc::vec::Vec< + super::datafusion_common::ScalarValue, + >, #[prost(message, optional, tag = "5")] pub range: ::core::option::Option, #[prost(message, optional, tag = "6")] @@ -1886,8 +1905,9 @@ pub struct RecursiveQueryNode { #[prost(message, optional, boxed, tag = "2")] pub static_term: ::core::option::Option<::prost::alloc::boxed::Box>, #[prost(message, optional, boxed, tag = "3")] - pub recursive_term: - ::core::option::Option<::prost::alloc::boxed::Box>, + pub recursive_term: ::core::option::Option< + ::prost::alloc::boxed::Box, + >, #[prost(bool, tag = "4")] pub is_distinct: bool, } @@ -1898,9 +1918,7 @@ pub struct CteWorkTableScanNode { #[prost(message, optional, tag = "2")] pub schema: ::core::option::Option, } -#[derive( - Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, -)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum WindowFrameUnits { Rows = 0, @@ -1929,9 +1947,7 @@ impl WindowFrameUnits { } } } -#[derive( - Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, -)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum WindowFrameBoundType { CurrentRow = 0, @@ -1960,9 +1976,7 @@ impl WindowFrameBoundType { } } } -#[derive( - Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, -)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum DateUnit { Day = 0, @@ -1988,9 +2002,7 @@ impl DateUnit { } } } -#[derive( - Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, -)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum InsertOp { Append = 0, @@ -2019,9 +2031,7 @@ impl InsertOp { } } } -#[derive( - Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, -)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum PartitionMode { CollectLeft = 0, @@ -2050,9 +2060,7 @@ impl PartitionMode { } } } -#[derive( - Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, -)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum StreamPartitionMode { SinglePartition = 0, @@ -2078,9 +2086,7 @@ impl StreamPartitionMode { } } } -#[derive( - Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, -)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum AggregateMode { Partial = 0,