diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index fa5d1f442754..b3e9418c9f9c 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -1548,18 +1548,22 @@ impl serde::Serialize for CsvOptions { let mut struct_ser = serializer.serialize_struct("datafusion_common.CsvOptions", len)?; if !self.has_header.is_empty() { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("hasHeader", pbjson::private::base64::encode(&self.has_header).as_str())?; } if !self.delimiter.is_empty() { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("delimiter", pbjson::private::base64::encode(&self.delimiter).as_str())?; } if !self.quote.is_empty() { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("quote", pbjson::private::base64::encode(&self.quote).as_str())?; } if !self.escape.is_empty() { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("escape", pbjson::private::base64::encode(&self.escape).as_str())?; } if self.compression != 0 { @@ -1569,6 +1573,7 @@ impl serde::Serialize for CsvOptions { } if self.schema_infer_max_rec != 0 { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("schemaInferMaxRec", ToString::to_string(&self.schema_infer_max_rec).as_str())?; } if !self.date_format.is_empty() { @@ -1591,18 +1596,22 @@ impl serde::Serialize for CsvOptions { } if !self.comment.is_empty() { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("comment", pbjson::private::base64::encode(&self.comment).as_str())?; } if !self.double_quote.is_empty() { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("doubleQuote", pbjson::private::base64::encode(&self.double_quote).as_str())?; } if !self.newlines_in_values.is_empty() { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("newlinesInValues", pbjson::private::base64::encode(&self.newlines_in_values).as_str())?; } if !self.terminator.is_empty() { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("terminator", pbjson::private::base64::encode(&self.terminator).as_str())?; } struct_ser.end() @@ -2276,14 +2285,17 @@ impl serde::Serialize for Decimal128 { let mut struct_ser = serializer.serialize_struct("datafusion_common.Decimal128", len)?; if !self.value.is_empty() { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("value", pbjson::private::base64::encode(&self.value).as_str())?; } if self.p != 0 { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("p", ToString::to_string(&self.p).as_str())?; } if self.s != 0 { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("s", ToString::to_string(&self.s).as_str())?; } struct_ser.end() @@ -2410,14 +2422,17 @@ impl serde::Serialize for Decimal256 { let mut struct_ser = serializer.serialize_struct("datafusion_common.Decimal256", len)?; if !self.value.is_empty() { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("value", pbjson::private::base64::encode(&self.value).as_str())?; } if self.p != 0 { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("p", ToString::to_string(&self.p).as_str())?; } if self.s != 0 { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("s", ToString::to_string(&self.s).as_str())?; } struct_ser.end() @@ -3080,6 +3095,7 @@ impl serde::Serialize for Field { } if self.dict_id != 0 { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("dictId", ToString::to_string(&self.dict_id).as_str())?; } if self.dict_ordered { @@ -3484,6 +3500,7 @@ impl serde::Serialize for IntervalMonthDayNanoValue { } if self.nanos != 0 { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("nanos", ToString::to_string(&self.nanos).as_str())?; } struct_ser.end() @@ -3917,6 +3934,7 @@ impl serde::Serialize for JsonOptions { } if self.schema_infer_max_rec != 0 { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("schemaInferMaxRec", ToString::to_string(&self.schema_infer_max_rec).as_str())?; } struct_ser.end() @@ -4474,6 +4492,7 @@ impl serde::Serialize for ParquetColumnOptions { match v { parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v) => { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("bloomFilterNdv", ToString::to_string(&v).as_str())?; } } @@ -4951,10 +4970,12 @@ impl serde::Serialize for ParquetOptions { } if self.data_pagesize_limit != 0 { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("dataPagesizeLimit", ToString::to_string(&self.data_pagesize_limit).as_str())?; } if self.write_batch_size != 0 { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("writeBatchSize", ToString::to_string(&self.write_batch_size).as_str())?; } if !self.writer_version.is_empty() { @@ -4965,10 +4986,12 @@ impl serde::Serialize for ParquetOptions { } if self.maximum_parallel_row_group_writers != 0 { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("maximumParallelRowGroupWriters", ToString::to_string(&self.maximum_parallel_row_group_writers).as_str())?; } if self.maximum_buffered_record_batches_per_stream != 0 { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("maximumBufferedRecordBatchesPerStream", ToString::to_string(&self.maximum_buffered_record_batches_per_stream).as_str())?; } if self.bloom_filter_on_read { @@ -4982,14 +5005,17 @@ impl serde::Serialize for ParquetOptions { } if self.dictionary_page_size_limit != 0 { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("dictionaryPageSizeLimit", ToString::to_string(&self.dictionary_page_size_limit).as_str())?; } if self.data_page_row_count_limit != 0 { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("dataPageRowCountLimit", ToString::to_string(&self.data_page_row_count_limit).as_str())?; } if self.max_row_group_size != 0 { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("maxRowGroupSize", ToString::to_string(&self.max_row_group_size).as_str())?; } if !self.created_by.is_empty() { @@ -4999,6 +5025,7 @@ impl serde::Serialize for ParquetOptions { match v { parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v) => { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("metadataSizeHint", ToString::to_string(&v).as_str())?; } } @@ -5028,6 +5055,7 @@ impl serde::Serialize for ParquetOptions { match v { parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v) => { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("maxStatisticsSize", ToString::to_string(&v).as_str())?; } } @@ -5036,6 +5064,7 @@ impl serde::Serialize for ParquetOptions { match v { parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v) => { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("columnIndexTruncateLength", ToString::to_string(&v).as_str())?; } } @@ -5058,6 +5087,7 @@ impl serde::Serialize for ParquetOptions { match v { parquet_options::BloomFilterNdvOpt::BloomFilterNdv(v) => { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("bloomFilterNdv", ToString::to_string(&v).as_str())?; } } @@ -5140,7 +5170,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { MaximumBufferedRecordBatchesPerStream, BloomFilterOnRead, BloomFilterOnWrite, - schemaForceViewTypes, + SchemaForceViewTypes, DictionaryPageSizeLimit, DataPageRowCountLimit, MaxRowGroupSize, @@ -5188,7 +5218,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "maximumBufferedRecordBatchesPerStream" | "maximum_buffered_record_batches_per_stream" => Ok(GeneratedField::MaximumBufferedRecordBatchesPerStream), "bloomFilterOnRead" | "bloom_filter_on_read" => Ok(GeneratedField::BloomFilterOnRead), "bloomFilterOnWrite" | "bloom_filter_on_write" => Ok(GeneratedField::BloomFilterOnWrite), - "schemaForceViewTypes" | "schema_force_view_types" => Ok(GeneratedField::schemaForceViewTypes), + "schemaForceViewTypes" | "schema_force_view_types" => Ok(GeneratedField::SchemaForceViewTypes), "dictionaryPageSizeLimit" | "dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit), "dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit), "maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize), @@ -5336,7 +5366,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } bloom_filter_on_write__ = Some(map_.next_value()?); } - GeneratedField::schemaForceViewTypes => { + GeneratedField::SchemaForceViewTypes => { if schema_force_view_types__.is_some() { return Err(serde::de::Error::duplicate_field("schemaForceViewTypes")); } @@ -5867,6 +5897,7 @@ impl serde::Serialize for ScalarFixedSizeBinary { let mut struct_ser = serializer.serialize_struct("datafusion_common.ScalarFixedSizeBinary", len)?; if !self.values.is_empty() { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("values", pbjson::private::base64::encode(&self.values).as_str())?; } if self.length != 0 { @@ -5986,10 +6017,12 @@ impl serde::Serialize for ScalarNestedValue { let mut struct_ser = serializer.serialize_struct("datafusion_common.ScalarNestedValue", len)?; if !self.ipc_message.is_empty() { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("ipcMessage", pbjson::private::base64::encode(&self.ipc_message).as_str())?; } if !self.arrow_data.is_empty() { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("arrowData", pbjson::private::base64::encode(&self.arrow_data).as_str())?; } if let Some(v) = self.schema.as_ref() { @@ -6130,10 +6163,12 @@ impl serde::Serialize for scalar_nested_value::Dictionary { let mut struct_ser = serializer.serialize_struct("datafusion_common.ScalarNestedValue.Dictionary", len)?; if !self.ipc_message.is_empty() { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("ipcMessage", pbjson::private::base64::encode(&self.ipc_message).as_str())?; } if !self.arrow_data.is_empty() { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("arrowData", pbjson::private::base64::encode(&self.arrow_data).as_str())?; } struct_ser.end() @@ -6354,10 +6389,12 @@ impl serde::Serialize for ScalarTime64Value { match v { scalar_time64_value::Value::Time64MicrosecondValue(v) => { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("time64MicrosecondValue", ToString::to_string(&v).as_str())?; } scalar_time64_value::Value::Time64NanosecondValue(v) => { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("time64NanosecondValue", ToString::to_string(&v).as_str())?; } } @@ -6471,18 +6508,22 @@ impl serde::Serialize for ScalarTimestampValue { match v { scalar_timestamp_value::Value::TimeMicrosecondValue(v) => { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("timeMicrosecondValue", ToString::to_string(&v).as_str())?; } scalar_timestamp_value::Value::TimeNanosecondValue(v) => { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("timeNanosecondValue", ToString::to_string(&v).as_str())?; } scalar_timestamp_value::Value::TimeSecondValue(v) => { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("timeSecondValue", ToString::to_string(&v).as_str())?; } scalar_timestamp_value::Value::TimeMillisecondValue(v) => { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("timeMillisecondValue", ToString::to_string(&v).as_str())?; } } @@ -6645,6 +6686,7 @@ impl serde::Serialize for ScalarValue { } scalar_value::Value::Int64Value(v) => { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("int64Value", ToString::to_string(&v).as_str())?; } scalar_value::Value::Uint8Value(v) => { @@ -6658,6 +6700,7 @@ impl serde::Serialize for ScalarValue { } scalar_value::Value::Uint64Value(v) => { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("uint64Value", ToString::to_string(&v).as_str())?; } scalar_value::Value::Float32Value(v) => { @@ -6695,6 +6738,7 @@ impl serde::Serialize for ScalarValue { } scalar_value::Value::Date64Value(v) => { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("date64Value", ToString::to_string(&v).as_str())?; } scalar_value::Value::IntervalYearmonthValue(v) => { @@ -6702,18 +6746,22 @@ impl serde::Serialize for ScalarValue { } scalar_value::Value::DurationSecondValue(v) => { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("durationSecondValue", ToString::to_string(&v).as_str())?; } scalar_value::Value::DurationMillisecondValue(v) => { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("durationMillisecondValue", ToString::to_string(&v).as_str())?; } scalar_value::Value::DurationMicrosecondValue(v) => { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("durationMicrosecondValue", ToString::to_string(&v).as_str())?; } scalar_value::Value::DurationNanosecondValue(v) => { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("durationNanosecondValue", ToString::to_string(&v).as_str())?; } scalar_value::Value::TimestampValue(v) => { @@ -6724,14 +6772,17 @@ impl serde::Serialize for ScalarValue { } scalar_value::Value::BinaryValue(v) => { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("binaryValue", pbjson::private::base64::encode(&v).as_str())?; } scalar_value::Value::LargeBinaryValue(v) => { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("largeBinaryValue", pbjson::private::base64::encode(&v).as_str())?; } scalar_value::Value::BinaryViewValue(v) => { #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("binaryViewValue", pbjson::private::base64::encode(&v).as_str())?; } scalar_value::Value::Time64Value(v) => { diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index d6f982278d67..be12d5b8e30e 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -45,10 +45,10 @@ pub struct ParquetFormat { pub options: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct AvroFormat {} #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct NdJsonFormat { #[prost(message, optional, tag = "1")] pub options: ::core::option::Option, @@ -89,10 +89,10 @@ pub struct Constraints { pub constraints: ::prost::alloc::vec::Vec, } #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct AvroOptions {} #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ArrowOptions {} #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -137,7 +137,7 @@ pub struct Timestamp { pub timezone: ::prost::alloc::string::String, } #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct Decimal { #[prost(uint32, tag = "3")] pub precision: u32, @@ -145,7 +145,7 @@ pub struct Decimal { pub scale: i32, } #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct Decimal256Type { #[prost(uint32, tag = "3")] pub precision: u32, @@ -223,7 +223,7 @@ pub mod scalar_nested_value { } } #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ScalarTime32Value { #[prost(oneof = "scalar_time32_value::Value", tags = "1, 2")] pub value: ::core::option::Option, @@ -231,7 +231,7 @@ pub struct ScalarTime32Value { /// Nested message and enum types in `ScalarTime32Value`. pub mod scalar_time32_value { #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum Value { #[prost(int32, tag = "1")] Time32SecondValue(i32), @@ -240,7 +240,7 @@ pub mod scalar_time32_value { } } #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ScalarTime64Value { #[prost(oneof = "scalar_time64_value::Value", tags = "1, 2")] pub value: ::core::option::Option, @@ -248,7 +248,7 @@ pub struct ScalarTime64Value { /// Nested message and enum types in `ScalarTime64Value`. pub mod scalar_time64_value { #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum Value { #[prost(int64, tag = "1")] Time64MicrosecondValue(i64), @@ -267,7 +267,7 @@ pub struct ScalarTimestampValue { /// Nested message and enum types in `ScalarTimestampValue`. pub mod scalar_timestamp_value { #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum Value { #[prost(int64, tag = "1")] TimeMicrosecondValue(i64), @@ -288,7 +288,7 @@ pub struct ScalarDictionaryValue { pub value: ::core::option::Option<::prost::alloc::boxed::Box>, } #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct IntervalDayTimeValue { #[prost(int32, tag = "1")] pub days: i32, @@ -296,7 +296,7 @@ pub struct IntervalDayTimeValue { pub milliseconds: i32, } #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct IntervalMonthDayNanoValue { #[prost(int32, tag = "1")] pub months: i32, @@ -558,10 +558,10 @@ pub mod arrow_type { /// } /// } #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct EmptyMessage {} #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct JsonWriterOptions { #[prost(enumeration = "CompressionTypeVariant", tag = "1")] pub compression: i32, @@ -658,7 +658,7 @@ pub struct CsvOptions { } /// Options controlling CSV format #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct JsonOptions { /// Compression type #[prost(enumeration = "CompressionTypeVariant", tag = "1")] @@ -723,7 +723,7 @@ pub struct ParquetColumnOptions { /// Nested message and enum types in `ParquetColumnOptions`. pub mod parquet_column_options { #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum BloomFilterEnabledOpt { #[prost(bool, tag = "1")] BloomFilterEnabled(bool), @@ -735,7 +735,7 @@ pub mod parquet_column_options { Encoding(::prost::alloc::string::String), } #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum DictionaryEnabledOpt { #[prost(bool, tag = "3")] DictionaryEnabled(bool), @@ -753,19 +753,19 @@ pub mod parquet_column_options { StatisticsEnabled(::prost::alloc::string::String), } #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum BloomFilterFppOpt { #[prost(double, tag = "6")] BloomFilterFpp(f64), } #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum BloomFilterNdvOpt { #[prost(uint64, tag = "7")] BloomFilterNdv(u64), } #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum MaxStatisticsSizeOpt { #[prost(uint32, tag = "8")] MaxStatisticsSize(u32), @@ -860,7 +860,7 @@ pub struct ParquetOptions { /// Nested message and enum types in `ParquetOptions`. pub mod parquet_options { #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum MetadataSizeHintOpt { #[prost(uint64, tag = "4")] MetadataSizeHint(u64), @@ -872,7 +872,7 @@ pub mod parquet_options { Compression(::prost::alloc::string::String), } #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum DictionaryEnabledOpt { #[prost(bool, tag = "11")] DictionaryEnabled(bool), @@ -884,13 +884,13 @@ pub mod parquet_options { StatisticsEnabled(::prost::alloc::string::String), } #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum MaxStatisticsSizeOpt { #[prost(uint64, tag = "14")] MaxStatisticsSize(u64), } #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum ColumnIndexTruncateLengthOpt { #[prost(uint64, tag = "17")] ColumnIndexTruncateLength(u64), @@ -902,13 +902,13 @@ pub mod parquet_options { Encoding(::prost::alloc::string::String), } #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum BloomFilterFppOpt { #[prost(double, tag = "21")] BloomFilterFpp(f64), } #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum BloomFilterNdvOpt { #[prost(uint64, tag = "22")] BloomFilterNdv(u64), diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 645df14a0337..05ac19be223b 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -59,6 +59,11 @@ message LogicalPlanNode { DistinctOnNode distinct_on = 28; CopyToNode copy_to = 29; UnnestNode unnest = 30; + // Gap in numeration here to accommodate changes in upstream + // and make it easier to pull changes in the future + // RecursiveQueryNode recursive_query = 31; + // CteWorkTableScanNode cte_work_table_scan = 32; + DmlNode dml = 33; } } @@ -260,6 +265,22 @@ message CopyToNode { repeated string partition_by = 7; } +message DmlNode{ + enum Type { + UPDATE = 0; + DELETE = 1; + CTAS = 2; + INSERT_APPEND = 3; + INSERT_OVERWRITE = 4; + INSERT_REPLACE = 5; + + } + Type dml_type = 1; + LogicalPlanNode input = 2; + TableReference table_name = 3; + LogicalPlanNode target = 5; +} + message UnnestNode { LogicalPlanNode input = 1; repeated datafusion_common.Column exec_columns = 2; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index e493f761b51f..88986e2399bb 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -4390,6 +4390,235 @@ impl<'de> serde::Deserialize<'de> for DistinctOnNode { deserializer.deserialize_struct("datafusion.DistinctOnNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for DmlNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.dml_type != 0 { + len += 1; + } + if self.input.is_some() { + len += 1; + } + if self.table_name.is_some() { + len += 1; + } + if self.target.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.DmlNode", len)?; + if self.dml_type != 0 { + let v = dml_node::Type::try_from(self.dml_type) + .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.dml_type)))?; + struct_ser.serialize_field("dmlType", &v)?; + } + if let Some(v) = self.input.as_ref() { + struct_ser.serialize_field("input", v)?; + } + if let Some(v) = self.table_name.as_ref() { + struct_ser.serialize_field("tableName", v)?; + } + if let Some(v) = self.target.as_ref() { + struct_ser.serialize_field("target", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for DmlNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "dml_type", + "dmlType", + "input", + "table_name", + "tableName", + "target", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + DmlType, + Input, + TableName, + Target, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "dmlType" | "dml_type" => Ok(GeneratedField::DmlType), + "input" => Ok(GeneratedField::Input), + "tableName" | "table_name" => Ok(GeneratedField::TableName), + "target" => Ok(GeneratedField::Target), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = DmlNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.DmlNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut dml_type__ = None; + let mut input__ = None; + let mut table_name__ = None; + let mut target__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::DmlType => { + if dml_type__.is_some() { + return Err(serde::de::Error::duplicate_field("dmlType")); + } + dml_type__ = Some(map_.next_value::()? as i32); + } + GeneratedField::Input => { + if input__.is_some() { + return Err(serde::de::Error::duplicate_field("input")); + } + input__ = map_.next_value()?; + } + GeneratedField::TableName => { + if table_name__.is_some() { + return Err(serde::de::Error::duplicate_field("tableName")); + } + table_name__ = map_.next_value()?; + } + GeneratedField::Target => { + if target__.is_some() { + return Err(serde::de::Error::duplicate_field("target")); + } + target__ = map_.next_value()?; + } + } + } + Ok(DmlNode { + dml_type: dml_type__.unwrap_or_default(), + input: input__, + table_name: table_name__, + target: target__, + }) + } + } + deserializer.deserialize_struct("datafusion.DmlNode", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for dml_node::Type { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + let variant = match self { + Self::Update => "UPDATE", + Self::Delete => "DELETE", + Self::Ctas => "CTAS", + Self::InsertAppend => "INSERT_APPEND", + Self::InsertOverwrite => "INSERT_OVERWRITE", + Self::InsertReplace => "INSERT_REPLACE", + }; + serializer.serialize_str(variant) + } +} +impl<'de> serde::Deserialize<'de> for dml_node::Type { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "UPDATE", + "DELETE", + "CTAS", + "INSERT_APPEND", + "INSERT_OVERWRITE", + "INSERT_REPLACE", + ]; + + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = dml_node::Type; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + fn visit_i64(self, v: i64) -> std::result::Result + where + E: serde::de::Error, + { + i32::try_from(v) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self) + }) + } + + fn visit_u64(self, v: u64) -> std::result::Result + where + E: serde::de::Error, + { + i32::try_from(v) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self) + }) + } + + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "UPDATE" => Ok(dml_node::Type::Update), + "DELETE" => Ok(dml_node::Type::Delete), + "CTAS" => Ok(dml_node::Type::Ctas), + "INSERT_APPEND" => Ok(dml_node::Type::InsertAppend), + "INSERT_OVERWRITE" => Ok(dml_node::Type::InsertOverwrite), + "INSERT_REPLACE" => Ok(dml_node::Type::InsertReplace), + _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), + } + } + } + deserializer.deserialize_any(GeneratedVisitor) + } +} impl serde::Serialize for DropViewNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -10086,6 +10315,9 @@ impl serde::Serialize for LogicalPlanNode { logical_plan_node::LogicalPlanType::Unnest(v) => { struct_ser.serialize_field("unnest", v)?; } + logical_plan_node::LogicalPlanType::Dml(v) => { + struct_ser.serialize_field("dml", v)?; + } } } struct_ser.end() @@ -10140,6 +10372,7 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode { "copy_to", "copyTo", "unnest", + "dml", ]; #[allow(clippy::enum_variant_names)] @@ -10173,6 +10406,7 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode { DistinctOn, CopyTo, Unnest, + Dml, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -10223,6 +10457,7 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode { "distinctOn" | "distinct_on" => Ok(GeneratedField::DistinctOn), "copyTo" | "copy_to" => Ok(GeneratedField::CopyTo), "unnest" => Ok(GeneratedField::Unnest), + "dml" => Ok(GeneratedField::Dml), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -10446,6 +10681,13 @@ impl<'de> serde::Deserialize<'de> for LogicalPlanNode { return Err(serde::de::Error::duplicate_field("unnest")); } logical_plan_type__ = map_.next_value::<::std::option::Option<_>>()?.map(logical_plan_node::LogicalPlanType::Unnest) +; + } + GeneratedField::Dml => { + if logical_plan_type__.is_some() { + return Err(serde::de::Error::duplicate_field("dml")); + } + logical_plan_type__ = map_.next_value::<::std::option::Option<_>>()?.map(logical_plan_node::LogicalPlanType::Dml) ; } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 1d086a610ce4..8b690d4b5964 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -6,7 +6,7 @@ pub struct LogicalPlanNode { #[prost( oneof = "logical_plan_node::LogicalPlanType", - tags = "1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30" + tags = "1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 33" )] pub logical_plan_type: ::core::option::Option, } @@ -73,6 +73,8 @@ pub mod logical_plan_node { CopyTo(::prost::alloc::boxed::Box), #[prost(message, tag = "30")] Unnest(::prost::alloc::boxed::Box), + #[prost(message, tag = "33")] + Dml(::prost::alloc::boxed::Box), } } #[allow(clippy::derive_partial_eq_without_eq)] @@ -426,6 +428,69 @@ pub struct CopyToNode { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct DmlNode { + #[prost(enumeration = "dml_node::Type", tag = "1")] + pub dml_type: i32, + #[prost(message, optional, boxed, tag = "2")] + pub input: ::core::option::Option<::prost::alloc::boxed::Box>, + #[prost(message, optional, tag = "3")] + pub table_name: ::core::option::Option, + #[prost(message, optional, boxed, tag = "5")] + pub target: ::core::option::Option<::prost::alloc::boxed::Box>, +} +/// Nested message and enum types in `DmlNode`. +pub mod dml_node { + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum Type { + Update = 0, + Delete = 1, + Ctas = 2, + InsertAppend = 3, + InsertOverwrite = 4, + InsertReplace = 5, + } + impl Type { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Type::Update => "UPDATE", + Type::Delete => "DELETE", + Type::Ctas => "CTAS", + Type::InsertAppend => "INSERT_APPEND", + Type::InsertOverwrite => "INSERT_OVERWRITE", + Type::InsertReplace => "INSERT_REPLACE", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "UPDATE" => Some(Self::Update), + "DELETE" => Some(Self::Delete), + "CTAS" => Some(Self::Ctas), + "INSERT_APPEND" => Some(Self::InsertAppend), + "INSERT_OVERWRITE" => Some(Self::InsertOverwrite), + "INSERT_REPLACE" => Some(Self::InsertReplace), + _ => None, + } + } + } +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct UnnestNode { #[prost(message, optional, boxed, tag = "1")] pub input: ::core::option::Option<::prost::alloc::boxed::Box>, diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 893255ccc8ce..75ee66fa5c50 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -24,7 +24,6 @@ use datafusion_common::{ }; use datafusion_expr::expr::{Alias, Placeholder, Sort}; use datafusion_expr::expr::{Unnest, WildcardOptions}; -use datafusion_expr::ExprFunctionExt; use datafusion_expr::{ expr::{self, InList, WindowFunction}, logical_plan::{PlanType, StringifiedPlan}, @@ -33,6 +32,7 @@ use datafusion_expr::{ JoinConstraint, JoinType, Like, Operator, TryCast, WindowFrame, WindowFrameBound, WindowFrameUnits, }; +use datafusion_expr::{ExprFunctionExt, WriteOp}; use datafusion_proto_common::{from_proto::FromOptionalField, FromProtoError as Error}; use crate::protobuf::plan_type::PlanTypeEnum::{ @@ -224,6 +224,19 @@ impl From for JoinConstraint { } } +impl From for WriteOp { + fn from(t: protobuf::dml_node::Type) -> Self { + match t { + protobuf::dml_node::Type::Update => WriteOp::Update, + protobuf::dml_node::Type::Delete => WriteOp::Delete, + protobuf::dml_node::Type::InsertAppend => WriteOp::InsertInto, + protobuf::dml_node::Type::InsertOverwrite => WriteOp::InsertOverwrite, + protobuf::dml_node::Type::InsertReplace => WriteOp::InsertOverwrite, + protobuf::dml_node::Type::Ctas => WriteOp::Ctas, + } + } +} + pub fn parse_expr( proto: &protobuf::LogicalExprNode, registry: &dyn FunctionRegistry, diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index bf5394ec01de..912ec20b3a2c 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -29,7 +29,7 @@ use crate::{ }, }; -use crate::protobuf::{proto_error, ToProtoError}; +use crate::protobuf::{dml_node, proto_error, DmlNode, ToProtoError}; use arrow::datatypes::{DataType, Schema, SchemaRef}; #[cfg(feature = "parquet")] use datafusion::datasource::file_format::parquet::ParquetFormat; @@ -51,8 +51,8 @@ use datafusion::{ }; use datafusion_common::file_options::file_type::FileType; use datafusion_common::{ - context, internal_datafusion_err, internal_err, not_impl_err, DataFusionError, - Result, TableReference, + context, internal_datafusion_err, internal_err, not_impl_err, plan_err, + DataFusionError, Result, TableReference, ToDFSchema, }; use datafusion_expr::{ dml, @@ -65,7 +65,7 @@ use datafusion_expr::{ DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr, WindowUDF, }; -use datafusion_expr::{AggregateUDF, Unnest}; +use datafusion_expr::{AggregateUDF, DmlStatement, TableSource, Unnest}; use self::to_proto::{serialize_expr, serialize_exprs}; use crate::logical_plan::to_proto::serialize_sorts; @@ -229,6 +229,45 @@ fn from_table_reference( Ok(table_ref.clone().try_into()?) } +/// Converts [LogicalPlan::TableScan] to [TableSource] +/// method to be used to deserialize nodes +/// serialized by [from_table_source] +fn to_table_source( + node: &Option>, + ctx: &SessionContext, + extension_codec: &dyn LogicalExtensionCodec, +) -> Result> { + if let Some(node) = node { + match node.try_into_logical_plan(ctx, extension_codec)? { + LogicalPlan::TableScan(TableScan { source, .. }) => Ok(source), + _ => plan_err!("expected TableScan node"), + } + } else { + plan_err!("LogicalPlanNode should be provided") + } +} + +/// converts [TableSource] to [LogicalPlan::TableScan] +/// using [LogicalPlan::TableScan] was the best approach to +/// serialize [TableSource] to [LogicalPlan::TableScan] +fn from_table_source( + table_name: TableReference, + target: Arc, + extension_codec: &dyn LogicalExtensionCodec, +) -> Result { + let projected_schema = target.schema().to_dfschema_ref()?; + let r = LogicalPlan::TableScan(TableScan { + table_name, + source: target, + projection: None, + projected_schema, + filters: vec![], + fetch: None, + }); + + LogicalPlanNode::try_from_logical_plan(&r, extension_codec) +} + impl AsLogicalPlan for LogicalPlanNode { fn try_decode(buf: &[u8]) -> Result where @@ -880,6 +919,14 @@ impl AsLogicalPlan for LogicalPlanNode { options: into_required!(unnest.options)?, })) } + LogicalPlanType::Dml(dml_node) => Ok(LogicalPlan::Dml( + datafusion::logical_expr::DmlStatement::new( + from_table_reference(dml_node.table_name.as_ref(), "DML ")?, + to_table_source(&dml_node.target, ctx, extension_codec)?, + dml_node.dml_type().into(), + Arc::new(into_logical_plan!(dml_node.input, ctx, extension_codec)?), + ), + )), } } @@ -1055,15 +1102,14 @@ impl AsLogicalPlan for LogicalPlanNode { } } LogicalPlan::Projection(Projection { expr, input, .. }) => { + let input = protobuf::LogicalPlanNode::try_from_logical_plan( + input.as_ref(), + extension_codec, + )?; Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Projection(Box::new( protobuf::ProjectionNode { - input: Some(Box::new( - protobuf::LogicalPlanNode::try_from_logical_plan( - input.as_ref(), - extension_codec, - )?, - )), + input: Some(Box::new(input)), expr: serialize_exprs(expr, extension_codec)?, optional_alias: None, }, @@ -1593,9 +1639,29 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlan::Statement(_) => Err(proto_error( "LogicalPlan serde is not yet implemented for Statement", )), - LogicalPlan::Dml(_) => Err(proto_error( - "LogicalPlan serde is not yet implemented for Dml", - )), + LogicalPlan::Dml(DmlStatement { + table_name, + dst, + op, + input, + .. + }) => { + let input = + LogicalPlanNode::try_from_logical_plan(input, extension_codec)?; + let dml_type: dml_node::Type = op.into(); + Ok(LogicalPlanNode { + logical_plan_type: Some(LogicalPlanType::Dml(Box::new(DmlNode { + input: Some(Box::new(input)), + target: Some(Box::new(from_table_source( + table_name.clone(), + Arc::clone(dst), + extension_codec, + )?)), + table_name: Some(table_name.clone().into()), + dml_type: dml_type.into(), + }))), + }) + } LogicalPlan::Copy(dml::CopyTo { input, output_url, diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 63d1a007c1e5..91f787656e21 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -24,6 +24,7 @@ use datafusion_expr::expr::{ self, Alias, Between, BinaryExpr, Cast, GroupingSet, InList, Like, Placeholder, ScalarFunction, Unnest, }; +use datafusion_expr::WriteOp; use datafusion_expr::{ logical_plan::PlanType, logical_plan::StringifiedPlan, BuiltInWindowFunction, Expr, JoinConstraint, JoinType, SortExpr, TryCast, WindowFrame, WindowFrameBound, @@ -700,3 +701,15 @@ impl From for protobuf::JoinConstraint { } } } + +impl From<&WriteOp> for protobuf::dml_node::Type { + fn from(t: &WriteOp) -> Self { + match t { + WriteOp::InsertOverwrite => protobuf::dml_node::Type::InsertOverwrite, + WriteOp::InsertInto => protobuf::dml_node::Type::InsertAppend, + WriteOp::Delete => protobuf::dml_node::Type::Delete, + WriteOp::Update => protobuf::dml_node::Type::Update, + WriteOp::Ctas => protobuf::dml_node::Type::Ctas, + } + } +} diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index c892f1a76750..2b6f2c85f6c7 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -340,6 +340,46 @@ async fn roundtrip_logical_plan_aggregation() -> Result<()> { Ok(()) } +#[tokio::test] +async fn roundtrip_logical_plan_dml() -> Result<()> { + let ctx = SessionContext::new(); + let schema = Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Decimal128(15, 2), true), + ]); + + ctx.register_csv( + "t1", + "tests/testdata/test.csv", + CsvReadOptions::default().schema(&schema), + ) + .await?; + let queries = [ + "INSERT INTO T1 VALUES (1, null)", + "INSERT OVERWRITE T1 VALUES (1, null)", + // logical plans with REPLACE INTO are not supported yet + // TODO: add REPLACE INTO support + // "REPLACE INTO T1 VALUES (1, null)", + // "INSERT OR REPLACE INTO T1 VALUES (1, null)", + "DELETE FROM T1", + "UPDATE T1 SET a = 1", + "CREATE TABLE T2 AS SELECT * FROM T1", + ]; + for query in queries { + let plan = ctx.sql(query).await?.into_optimized_plan()?; + let bytes = logical_plan_to_bytes(&plan)?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; + assert_eq!( + format!("{plan}"), + format!("{logical_round_trip}"), + "failed query roundtrip: {}", + query + ); + } + + Ok(()) +} + #[tokio::test] async fn roundtrip_logical_plan_copy_to_sql_options() -> Result<()> { let ctx = SessionContext::new();