diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index b1aee41978c2..bbf3ee28db18 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -391,11 +391,33 @@ impl DFSchema { }) } + /// Returns true if the two schemas have the same qualified named + /// fields with logically equivalent data types. Returns false otherwise. + /// + /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type + /// equivalence checking. + pub fn logically_equivalent_names_and_types(&self, other: &Self) -> bool { + if self.fields().len() != other.fields().len() { + return false; + } + let self_fields = self.fields().iter(); + let other_fields = other.fields().iter(); + self_fields.zip(other_fields).all(|(f1, f2)| { + f1.qualifier() == f2.qualifier() + && f1.name() == f2.name() + && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type()) + }) + } + /// Returns true if the two schemas have the same qualified named /// fields with the same data types. Returns false otherwise. /// /// This is a specialized version of Eq that ignores differences /// in nullability and metadata. + /// + /// Use [DFSchema]::logically_equivalent_names_and_types for a weaker + /// logical type checking, which for example would consider a dictionary + /// encoded UTF8 array to be equivalent to a plain UTF8 array. pub fn equivalent_names_and_types(&self, other: &Self) -> bool { if self.fields().len() != other.fields().len() { return false; @@ -409,6 +431,46 @@ impl DFSchema { }) } + /// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint + /// than datatype_is_semantically_equal in that a Dictionary type is logically + /// equal to a plain V type, but not semantically equal. Dictionary is also + /// logically equal to Dictionary. + fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool { + // check nested fields + match (dt1, dt2) { + (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => { + v1.as_ref() == v2.as_ref() + } + (DataType::Dictionary(_, v1), othertype) => v1.as_ref() == othertype, + (othertype, DataType::Dictionary(_, v1)) => v1.as_ref() == othertype, + (DataType::List(f1), DataType::List(f2)) + | (DataType::LargeList(f1), DataType::LargeList(f2)) + | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) + | (DataType::Map(f1, _), DataType::Map(f2, _)) => { + Self::field_is_logically_equal(f1, f2) + } + (DataType::Struct(fields1), DataType::Struct(fields2)) => { + let iter1 = fields1.iter(); + let iter2 = fields2.iter(); + fields1.len() == fields2.len() && + // all fields have to be the same + iter1 + .zip(iter2) + .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2)) + } + (DataType::Union(fields1, _), DataType::Union(fields2, _)) => { + let iter1 = fields1.iter(); + let iter2 = fields2.iter(); + fields1.len() == fields2.len() && + // all fields have to be the same + iter1 + .zip(iter2) + .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_logically_equal(f1, f2)) + } + _ => dt1 == dt2, + } + } + /// Returns true of two [`DataType`]s are semantically equal (same /// name and type), ignoring both metadata and nullability. /// @@ -448,6 +510,11 @@ impl DFSchema { } } + fn field_is_logically_equal(f1: &Field, f2: &Field) -> bool { + f1.name() == f2.name() + && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type()) + } + fn field_is_semantically_equal(f1: &Field, f2: &Field) -> bool { f1.name() == f2.name() && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type()) @@ -778,6 +845,13 @@ pub trait SchemaExt { /// /// It works the same as [`DFSchema::equivalent_names_and_types`]. fn equivalent_names_and_types(&self, other: &Self) -> bool; + + /// Returns true if the two schemas have the same qualified named + /// fields with logically equivalent data types. Returns false otherwise. + /// + /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type + /// equivalence checking. + fn logically_equivalent_names_and_types(&self, other: &Self) -> bool; } impl SchemaExt for Schema { @@ -797,6 +871,23 @@ impl SchemaExt for Schema { ) }) } + + fn logically_equivalent_names_and_types(&self, other: &Self) -> bool { + if self.fields().len() != other.fields().len() { + return false; + } + + self.fields() + .iter() + .zip(other.fields().iter()) + .all(|(f1, f2)| { + f1.name() == f2.name() + && DFSchema::datatype_is_logically_equal( + f1.data_type(), + f2.data_type(), + ) + }) + } } #[cfg(test)] diff --git a/datafusion/core/src/datasource/file_format/write/demux.rs b/datafusion/core/src/datasource/file_format/write/demux.rs index 2c44c0922c76..7c00edb610c4 100644 --- a/datafusion/core/src/datasource/file_format/write/demux.rs +++ b/datafusion/core/src/datasource/file_format/write/demux.rs @@ -29,7 +29,7 @@ use crate::physical_plan::SendableRecordBatchStream; use arrow_array::builder::UInt64Builder; use arrow_array::cast::AsArray; -use arrow_array::{RecordBatch, StructArray}; +use arrow_array::{downcast_dictionary_array, RecordBatch, StringArray, StructArray}; use arrow_schema::{DataType, Schema}; use datafusion_common::cast::as_string_array; use datafusion_common::DataFusionError; @@ -311,6 +311,22 @@ fn compute_partition_keys_by_row<'a>( partition_values.push(array.value(i)); } } + DataType::Dictionary(_, _) => { + downcast_dictionary_array!( + col_array => { + let array = col_array.downcast_dict::() + .ok_or(DataFusionError::Execution(format!("it is not yet supported to write to hive partitions with datatype {}", + dtype)))?; + + for val in array.values() { + partition_values.push( + val.ok_or(DataFusionError::Execution(format!("Cannot partition by null value for column {}", col)))? + ); + } + }, + _ => unreachable!(), + ) + } _ => { return Err(DataFusionError::NotImplemented(format!( "it is not yet supported to write to hive partitions with datatype {}", diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index ae6aa317ce85..60b2cc4149b9 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -830,7 +830,10 @@ impl TableProvider for ListingTable { overwrite: bool, ) -> Result> { // Check that the schema of the plan matches the schema of this table. - if !self.schema().equivalent_names_and_types(&input.schema()) { + if !self + .schema() + .logically_equivalent_names_and_types(&input.schema()) + { return plan_err!( // Return an error if schema of the input query does not match with the table schema. "Inserting query must have the same schema with the table." diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index a2f8e225e121..6bcaa97a408f 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -209,7 +209,10 @@ impl TableProvider for MemTable { ) -> Result> { // Create a physical plan from the logical plan. // Check that the schema of the plan matches the schema of this table. - if !self.schema().equivalent_names_and_types(&input.schema()) { + if !self + .schema() + .logically_equivalent_names_and_types(&input.schema()) + { return plan_err!( "Inserting query must have the same schema with the table." ); diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index b2206e987864..8b01a14568e7 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -40,8 +40,44 @@ STORED AS CSV WITH HEADER ROW LOCATION '../../testing/data/csv/aggregate_test_100.csv' -# test_insert_into +statement ok +create table dictionary_encoded_values as values +('a', arrow_cast('foo', 'Dictionary(Int32, Utf8)')), ('b', arrow_cast('bar', 'Dictionary(Int32, Utf8)')); + +query TTT +describe dictionary_encoded_values; +---- +column1 Utf8 YES +column2 Dictionary(Int32, Utf8) YES + +statement ok +CREATE EXTERNAL TABLE dictionary_encoded_parquet_partitioned( + a varchar, + b varchar, +) +STORED AS parquet +LOCATION 'test_files/scratch/insert_to_external/parquet_types_partitioned' +PARTITIONED BY (b) +OPTIONS( +create_local_path 'true', +insert_mode 'append_new_files', +); + +query TT +insert into dictionary_encoded_parquet_partitioned +select * from dictionary_encoded_values +---- +2 + +query TT +select * from dictionary_encoded_parquet_partitioned order by (a); +---- +a foo +b bar + + +# test_insert_into statement ok set datafusion.execution.target_partitions = 8;