@@ -1047,8 +1047,10 @@ def _task_to_record_batches(
10471047
10481048 fragment_scanner = ds .Scanner .from_fragment (
10491049 fragment = fragment ,
1050- # We always use large types in memory as it uses larger offsets
1051- # That can chunk more row values into the buffers
1050+ # With PyArrow 16.0.0 there is an issue with casting record-batches:
1051+ # https://github.com/apache/arrow/issues/41884
1052+ # https://github.com/apache/arrow/issues/43183
1053+ # Would be good to remove this later on
10521054 schema = _pyarrow_schema_ensure_large_types (physical_schema ),
10531055 # This will push down the query to Arrow.
10541056 # But in case there are positional deletes, we have to apply them first
@@ -1084,11 +1086,17 @@ def _task_to_table(
10841086 positional_deletes : Optional [List [ChunkedArray ]],
10851087 case_sensitive : bool ,
10861088 name_mapping : Optional [NameMapping ] = None ,
1087- ) -> pa .Table :
1088- batches = _task_to_record_batches (
1089- fs , task , bound_row_filter , projected_schema , projected_field_ids , positional_deletes , case_sensitive , name_mapping
1089+ ) -> Optional [pa .Table ]:
1090+ batches = list (
1091+ _task_to_record_batches (
1092+ fs , task , bound_row_filter , projected_schema , projected_field_ids , positional_deletes , case_sensitive , name_mapping
1093+ )
10901094 )
1091- return pa .Table .from_batches (batches , schema = schema_to_pyarrow (projected_schema , include_field_ids = False ))
1095+
1096+ if len (batches ) > 0 :
1097+ return pa .Table .from_batches (batches )
1098+ else :
1099+ return None
10921100
10931101
10941102def _read_all_delete_files (fs : FileSystem , tasks : Iterable [FileScanTask ]) -> Dict [str , List [ChunkedArray ]]:
@@ -1192,7 +1200,7 @@ def project_table(
11921200 if len (tables ) < 1 :
11931201 return pa .Table .from_batches ([], schema = schema_to_pyarrow (projected_schema , include_field_ids = False ))
11941202
1195- result = pa .concat_tables (tables )
1203+ result = pa .concat_tables (tables , promote_options = "permissive" )
11961204
11971205 if limit is not None :
11981206 return result .slice (0 , limit )
@@ -1271,54 +1279,62 @@ def project_batches(
12711279
12721280
12731281def to_requested_schema (
1274- requested_schema : Schema , file_schema : Schema , batch : pa .RecordBatch , downcast_ns_timestamp_to_us : bool = False
1282+ requested_schema : Schema ,
1283+ file_schema : Schema ,
1284+ batch : pa .RecordBatch ,
1285+ downcast_ns_timestamp_to_us : bool = False ,
1286+ include_field_ids : bool = False ,
12751287) -> pa .RecordBatch :
1288+ # We could re-use some of these visitors
12761289 struct_array = visit_with_partner (
1277- requested_schema , batch , ArrowProjectionVisitor (file_schema , downcast_ns_timestamp_to_us ), ArrowAccessor (file_schema )
1290+ requested_schema ,
1291+ batch ,
1292+ ArrowProjectionVisitor (file_schema , downcast_ns_timestamp_to_us , include_field_ids ),
1293+ ArrowAccessor (file_schema ),
12781294 )
1279-
1280- arrays = []
1281- fields = []
1282- for pos , field in enumerate (requested_schema .fields ):
1283- array = struct_array .field (pos )
1284- arrays .append (array )
1285- fields .append (pa .field (field .name , array .type , field .optional ))
1286- return pa .RecordBatch .from_arrays (arrays , schema = pa .schema (fields ))
1295+ return pa .RecordBatch .from_struct_array (struct_array )
12871296
12881297
12891298class ArrowProjectionVisitor (SchemaWithPartnerVisitor [pa .Array , Optional [pa .Array ]]):
12901299 file_schema : Schema
1300+ _include_field_ids : bool
12911301
1292- def __init__ (self , file_schema : Schema , downcast_ns_timestamp_to_us : bool = False ) :
1302+ def __init__ (self , file_schema : Schema , downcast_ns_timestamp_to_us : bool = False , include_field_ids : bool = False ) -> None :
12931303 self .file_schema = file_schema
1304+ self ._include_field_ids = include_field_ids
12941305 self .downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
12951306
12961307 def _cast_if_needed (self , field : NestedField , values : pa .Array ) -> pa .Array :
12971308 file_field = self .file_schema .find_field (field .field_id )
1309+
12981310 if field .field_type .is_primitive :
12991311 if field .field_type != file_field .field_type :
1300- return values .cast (schema_to_pyarrow (promote (file_field .field_type , field .field_type ), include_field_ids = False ))
1301- elif (target_type := schema_to_pyarrow (field .field_type , include_field_ids = False )) != values .type :
1302- # if file_field and field_type (e.g. String) are the same
1303- # but the pyarrow type of the array is different from the expected type
1304- # (e.g. string vs larger_string), we want to cast the array to the larger type
1305- safe = True
1312+ return values .cast (
1313+ schema_to_pyarrow (promote (file_field .field_type , field .field_type ), include_field_ids = self ._include_field_ids )
1314+ )
1315+ elif (target_type := schema_to_pyarrow (field .field_type , include_field_ids = self ._include_field_ids )) != values .type :
1316+ # Downcasting of nanoseconds to microseconds
13061317 if (
13071318 pa .types .is_timestamp (target_type )
13081319 and target_type .unit == "us"
13091320 and pa .types .is_timestamp (values .type )
13101321 and values .type .unit == "ns"
13111322 ):
1312- safe = False
1313- return values .cast (target_type , safe = safe )
1323+ return values .cast (target_type , safe = False )
13141324 return values
13151325
13161326 def _construct_field (self , field : NestedField , arrow_type : pa .DataType ) -> pa .Field :
1327+ metadata = {}
1328+ if field .doc :
1329+ metadata [PYARROW_FIELD_DOC_KEY ] = field .doc
1330+ if self ._include_field_ids :
1331+ metadata [PYARROW_PARQUET_FIELD_ID_KEY ] = str (field .field_id )
1332+
13171333 return pa .field (
13181334 name = field .name ,
13191335 type = arrow_type ,
13201336 nullable = field .optional ,
1321- metadata = { DOC : field . doc } if field . doc is not None else None ,
1337+ metadata = metadata ,
13221338 )
13231339
13241340 def schema (self , schema : Schema , schema_partner : Optional [pa .Array ], struct_result : Optional [pa .Array ]) -> Optional [pa .Array ]:
@@ -1960,14 +1976,15 @@ def write_parquet(task: WriteTask) -> DataFile:
19601976 file_schema = table_schema ,
19611977 batch = batch ,
19621978 downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us ,
1979+ include_field_ids = True ,
19631980 )
19641981 for batch in task .record_batches
19651982 ]
19661983 arrow_table = pa .Table .from_batches (batches )
19671984 file_path = f'{ table_metadata .location } /data/{ task .generate_data_file_path ("parquet" )} '
19681985 fo = io .new_output (file_path )
19691986 with fo .create (overwrite = True ) as fos :
1970- with pq .ParquetWriter (fos , schema = file_schema . as_arrow () , ** parquet_writer_kwargs ) as writer :
1987+ with pq .ParquetWriter (fos , schema = arrow_table . schema , ** parquet_writer_kwargs ) as writer :
19711988 writer .write (arrow_table , row_group_size = row_group_size )
19721989 statistics = data_file_statistics_from_parquet_metadata (
19731990 parquet_metadata = writer .writer .metadata ,
0 commit comments