Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,13 @@ impl FileFormat for CsvFormat {
let stream = self.read_to_delimited_chunks(store, object).await;
let (schema, records_read) = self
.infer_schema_from_stream(state, records_to_read, stream)
.await?;
.await
.map_err(|err| {
DataFusionError::Context(
format!("Error when processing CSV file {}", &object.location),
Box::new(err),
)
})?;
records_to_read -= records_read;
schemas.push(schema);
if records_to_read == 0 {
Expand Down Expand Up @@ -433,11 +439,13 @@ impl CsvFormat {
let mut total_records_read = 0;
let mut column_names = vec![];
let mut column_type_possibilities = vec![];
let mut first_chunk = true;
let mut record_number = -1;

pin_mut!(stream);

while let Some(chunk) = stream.next().await.transpose()? {
record_number += 1;
let first_chunk = record_number == 0;
let mut format = arrow::csv::reader::Format::default()
.with_header(
first_chunk
Expand Down Expand Up @@ -471,14 +479,14 @@ impl CsvFormat {
(field.name().clone(), possibilities)
})
.unzip();
first_chunk = false;
} else {
if fields.len() != column_type_possibilities.len() {
return exec_err!(
"Encountered unequal lengths between records on CSV file whilst inferring schema. \
Expected {} records, found {} records",
Expected {} fields, found {} fields at record {}",
column_type_possibilities.len(),
fields.len()
fields.len(),
record_number + 1
);
}

Expand Down