@@ -325,7 +325,13 @@ impl FileFormat for CsvFormat {
325325 let stream = self . read_to_delimited_chunks ( store, object) . await ;
326326 let ( schema, records_read) = self
327327 . infer_schema_from_stream ( state, records_to_read, stream)
328- . await ?;
328+ . await
329+ . map_err ( |err| {
330+ DataFusionError :: Context (
331+ format ! ( "Error when processing CSV file {}" , & object. location) ,
332+ Box :: new ( err) ,
333+ )
334+ } ) ?;
329335 records_to_read -= records_read;
330336 schemas. push ( schema) ;
331337 if records_to_read == 0 {
@@ -433,11 +439,13 @@ impl CsvFormat {
433439 let mut total_records_read = 0 ;
434440 let mut column_names = vec ! [ ] ;
435441 let mut column_type_possibilities = vec ! [ ] ;
436- let mut first_chunk = true ;
442+ let mut record_number = - 1 ;
437443
438444 pin_mut ! ( stream) ;
439445
440446 while let Some ( chunk) = stream. next ( ) . await . transpose ( ) ? {
447+ record_number += 1 ;
448+ let first_chunk = record_number == 0 ;
441449 let mut format = arrow:: csv:: reader:: Format :: default ( )
442450 . with_header (
443451 first_chunk
@@ -471,14 +479,14 @@ impl CsvFormat {
471479 ( field. name ( ) . clone ( ) , possibilities)
472480 } )
473481 . unzip ( ) ;
474- first_chunk = false ;
475482 } else {
476483 if fields. len ( ) != column_type_possibilities. len ( ) {
477484 return exec_err ! (
478485 "Encountered unequal lengths between records on CSV file whilst inferring schema. \
479- Expected {} records , found {} records ",
486+ Expected {} fields , found {} fields at record {} ",
480487 column_type_possibilities. len( ) ,
481- fields. len( )
488+ fields. len( ) ,
489+ record_number + 1
482490 ) ;
483491 }
484492
0 commit comments