@@ -21,6 +21,7 @@ use std::sync::Arc;
2121
2222use arrow:: datatypes:: SchemaRef ;
2323use arrow:: record_batch:: RecordBatch ;
24+ use arrow_array:: Array ;
2425use futures:: stream:: { StreamExt , TryStreamExt } ;
2526use tokio:: task:: JoinSet ;
2627
@@ -852,7 +853,7 @@ pub fn execute_input_stream(
852853 Ok ( Box :: pin ( RecordBatchStreamAdapter :: new (
853854 sink_schema,
854855 input_stream
855- . map ( move |batch| check_not_null_contraits ( batch?, & risky_columns) ) ,
856+ . map ( move |batch| check_not_null_constraints ( batch?, & risky_columns) ) ,
856857 ) ) )
857858 }
858859}
@@ -872,7 +873,7 @@ pub fn execute_input_stream(
872873/// This function iterates over the specified column indices and ensures that none
873874/// of the columns contain null values. If any column contains null values, an error
874875/// is returned.
875- pub fn check_not_null_contraits (
876+ pub fn check_not_null_constraints (
876877 batch : RecordBatch ,
877878 column_indices : & Vec < usize > ,
878879) -> Result < RecordBatch > {
@@ -885,7 +886,13 @@ pub fn check_not_null_contraits(
885886 ) ;
886887 }
887888
888- if batch. column ( index) . null_count ( ) > 0 {
889+ if batch
890+ . column ( index)
891+ . logical_nulls ( )
892+ . map ( |nulls| nulls. null_count ( ) )
893+ . unwrap_or_default ( )
894+ > 0
895+ {
889896 return exec_err ! (
890897 "Invalid batch column at '{}' has null but schema specifies non-nullable" ,
891898 index
@@ -920,11 +927,11 @@ pub enum CardinalityEffect {
920927#[ cfg( test) ]
921928mod tests {
922929 use super :: * ;
930+ use arrow_array:: { DictionaryArray , Int32Array , NullArray , RunArray } ;
931+ use arrow_schema:: { DataType , Field , Schema , SchemaRef } ;
923932 use std:: any:: Any ;
924933 use std:: sync:: Arc ;
925934
926- use arrow_schema:: { Schema , SchemaRef } ;
927-
928935 use datafusion_common:: { Result , Statistics } ;
929936 use datafusion_execution:: { SendableRecordBatchStream , TaskContext } ;
930937
@@ -1068,6 +1075,136 @@ mod tests {
10681075 fn use_execution_plan_as_trait_object ( plan : & dyn ExecutionPlan ) {
10691076 let _ = plan. name ( ) ;
10701077 }
1071- }
10721078
1073- // pub mod test;
1079+ #[ test]
1080+ fn test_check_not_null_constraints_accept_non_null ( ) -> Result < ( ) > {
1081+ check_not_null_constraints (
1082+ RecordBatch :: try_new (
1083+ Arc :: new ( Schema :: new ( vec ! [ Field :: new( "a" , DataType :: Int32 , true ) ] ) ) ,
1084+ vec ! [ Arc :: new( Int32Array :: from( vec![ Some ( 1 ) , Some ( 2 ) , Some ( 3 ) ] ) ) ] ,
1085+ ) ?,
1086+ & vec ! [ 0 ] ,
1087+ ) ?;
1088+ Ok ( ( ) )
1089+ }
1090+
1091+ #[ test]
1092+ fn test_check_not_null_constraints_reject_null ( ) -> Result < ( ) > {
1093+ let result = check_not_null_constraints (
1094+ RecordBatch :: try_new (
1095+ Arc :: new ( Schema :: new ( vec ! [ Field :: new( "a" , DataType :: Int32 , true ) ] ) ) ,
1096+ vec ! [ Arc :: new( Int32Array :: from( vec![ Some ( 1 ) , None , Some ( 3 ) ] ) ) ] ,
1097+ ) ?,
1098+ & vec ! [ 0 ] ,
1099+ ) ;
1100+ assert ! ( result. is_err( ) ) ;
1101+ assert_starts_with (
1102+ result. err ( ) . unwrap ( ) . message ( ) . as_ref ( ) ,
1103+ "Invalid batch column at '0' has null but schema specifies non-nullable" ,
1104+ ) ;
1105+ Ok ( ( ) )
1106+ }
1107+
1108+ #[ test]
1109+ fn test_check_not_null_constraints_with_run_end_array ( ) -> Result < ( ) > {
1110+ // some null value inside REE array
1111+ let run_ends = Int32Array :: from ( vec ! [ 1 , 2 , 3 , 4 ] ) ;
1112+ let values = Int32Array :: from ( vec ! [ Some ( 0 ) , None , Some ( 1 ) , None ] ) ;
1113+ let run_end_array = RunArray :: try_new ( & run_ends, & values) ?;
1114+ let result = check_not_null_constraints (
1115+ RecordBatch :: try_new (
1116+ Arc :: new ( Schema :: new ( vec ! [ Field :: new(
1117+ "a" ,
1118+ run_end_array. data_type( ) . to_owned( ) ,
1119+ true ,
1120+ ) ] ) ) ,
1121+ vec ! [ Arc :: new( run_end_array) ] ,
1122+ ) ?,
1123+ & vec ! [ 0 ] ,
1124+ ) ;
1125+ assert ! ( result. is_err( ) ) ;
1126+ assert_starts_with (
1127+ result. err ( ) . unwrap ( ) . message ( ) . as_ref ( ) ,
1128+ "Invalid batch column at '0' has null but schema specifies non-nullable" ,
1129+ ) ;
1130+ Ok ( ( ) )
1131+ }
1132+
1133+ #[ test]
1134+ fn test_check_not_null_constraints_with_dictionary_array_with_null ( ) -> Result < ( ) > {
1135+ let values = Arc :: new ( Int32Array :: from ( vec ! [ Some ( 1 ) , None , Some ( 3 ) , Some ( 4 ) ] ) ) ;
1136+ let keys = Int32Array :: from ( vec ! [ 0 , 1 , 2 , 3 ] ) ;
1137+ let dictionary = DictionaryArray :: new ( keys, values) ;
1138+ let result = check_not_null_constraints (
1139+ RecordBatch :: try_new (
1140+ Arc :: new ( Schema :: new ( vec ! [ Field :: new(
1141+ "a" ,
1142+ dictionary. data_type( ) . to_owned( ) ,
1143+ true ,
1144+ ) ] ) ) ,
1145+ vec ! [ Arc :: new( dictionary) ] ,
1146+ ) ?,
1147+ & vec ! [ 0 ] ,
1148+ ) ;
1149+ assert ! ( result. is_err( ) ) ;
1150+ assert_starts_with (
1151+ result. err ( ) . unwrap ( ) . message ( ) . as_ref ( ) ,
1152+ "Invalid batch column at '0' has null but schema specifies non-nullable" ,
1153+ ) ;
1154+ Ok ( ( ) )
1155+ }
1156+
1157+ #[ test]
1158+ fn test_check_not_null_constraints_with_dictionary_masking_null ( ) -> Result < ( ) > {
1159+ // some null value marked out by dictionary array
1160+ let values = Arc :: new ( Int32Array :: from ( vec ! [
1161+ Some ( 1 ) ,
1162+ None , // this null value is masked by dictionary keys
1163+ Some ( 3 ) ,
1164+ Some ( 4 ) ,
1165+ ] ) ) ;
1166+ let keys = Int32Array :: from ( vec ! [ 0 , /*1,*/ 2 , 3 ] ) ;
1167+ let dictionary = DictionaryArray :: new ( keys, values) ;
1168+ check_not_null_constraints (
1169+ RecordBatch :: try_new (
1170+ Arc :: new ( Schema :: new ( vec ! [ Field :: new(
1171+ "a" ,
1172+ dictionary. data_type( ) . to_owned( ) ,
1173+ true ,
1174+ ) ] ) ) ,
1175+ vec ! [ Arc :: new( dictionary) ] ,
1176+ ) ?,
1177+ & vec ! [ 0 ] ,
1178+ ) ?;
1179+ Ok ( ( ) )
1180+ }
1181+
1182+ #[ test]
1183+ fn test_check_not_null_constraints_on_null_type ( ) -> Result < ( ) > {
1184+ // null value of Null type
1185+ let result = check_not_null_constraints (
1186+ RecordBatch :: try_new (
1187+ Arc :: new ( Schema :: new ( vec ! [ Field :: new( "a" , DataType :: Null , true ) ] ) ) ,
1188+ vec ! [ Arc :: new( NullArray :: new( 3 ) ) ] ,
1189+ ) ?,
1190+ & vec ! [ 0 ] ,
1191+ ) ;
1192+ assert ! ( result. is_err( ) ) ;
1193+ assert_starts_with (
1194+ result. err ( ) . unwrap ( ) . message ( ) . as_ref ( ) ,
1195+ "Invalid batch column at '0' has null but schema specifies non-nullable" ,
1196+ ) ;
1197+ Ok ( ( ) )
1198+ }
1199+
1200+ fn assert_starts_with ( actual : impl AsRef < str > , expected_prefix : impl AsRef < str > ) {
1201+ let actual = actual. as_ref ( ) ;
1202+ let expected_prefix = expected_prefix. as_ref ( ) ;
1203+ assert ! (
1204+ actual. starts_with( expected_prefix) ,
1205+ "Expected '{}' to start with '{}'" ,
1206+ actual,
1207+ expected_prefix
1208+ ) ;
1209+ }
1210+ }
0 commit comments