Skip to content

Commit 56b9b13

Browse files
cloud-fanHyukjinKwon
authored andcommitted
[SPARK-48552][SQL] multi-line CSV schema inference should also throw FAILED_READ_FILE
### What changes were proposed in this pull request? multi-line CSV uses `BinaryFileRDD` instead of `FileScanRDD`, and we need to replicate the error handling code from `FileScanRDD`. Currently we already replicate the handling of ignore missing/corrupted files, and this PR replicates the error wrapping code. ### Why are the changes needed? to have consistent error message ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? updated test ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#46890 from cloud-fan/error. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 9de0a2e commit 56b9b13

File tree

4 files changed

+24
-16
lines changed

4 files changed

+24
-16
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.datasources.csv
2020
import java.io.{FileNotFoundException, IOException}
2121
import java.nio.charset.{Charset, StandardCharsets}
2222

23+
import scala.util.control.NonFatal
24+
2325
import com.univocity.parsers.csv.CsvParser
2426
import org.apache.hadoop.conf.Configuration
2527
import org.apache.hadoop.fs.{FileStatus, Path}
@@ -30,10 +32,12 @@ import org.apache.spark.TaskContext
3032
import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
3133
import org.apache.spark.internal.{Logging, MDC}
3234
import org.apache.spark.internal.LogKeys.PATH
35+
import org.apache.spark.paths.SparkPath
3336
import org.apache.spark.rdd.{BinaryFileRDD, RDD}
3437
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
3538
import org.apache.spark.sql.catalyst.InternalRow
3639
import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVInferSchema, CSVOptions, UnivocityParser}
40+
import org.apache.spark.sql.errors.QueryExecutionErrors
3741
import org.apache.spark.sql.execution.SQLExecution
3842
import org.apache.spark.sql.execution.datasources._
3943
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
@@ -210,6 +214,9 @@ object MultiLineCSVDataSource extends CSVDataSource with Logging {
210214
logWarning(log"Skipped the rest of the content in the corrupted file: " +
211215
log"${MDC(PATH, lines.getPath())}", e)
212216
Array.empty[Array[String]]
217+
case NonFatal(e) =>
218+
val path = SparkPath.fromPathString(lines.getPath())
219+
throw QueryExecutionErrors.cannotReadFilesError(e, path.urlEncoded)
213220
}
214221
}.take(1).headOption match {
215222
case Some(firstRow) =>

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1247,14 +1247,13 @@ abstract class CSVSuite
12471247
val exp = spark.sql("select timestamp_ntz'2020-12-12 12:12:12' as col0")
12481248
for (pattern <- patterns) {
12491249
withTempPath { path =>
1250-
val actualPath = path.toPath.toUri.toURL.toString
12511250
val ex = intercept[SparkException] {
12521251
exp.write.format("csv").option("timestampNTZFormat", pattern).save(path.getAbsolutePath)
12531252
}
12541253
checkErrorMatchPVals(
12551254
exception = ex,
12561255
errorClass = "TASK_WRITE_FAILED",
1257-
parameters = Map("path" -> s"$actualPath.*"))
1256+
parameters = Map("path" -> s".*${path.getName}.*"))
12581257
val msg = ex.getCause.getMessage
12591258
assert(
12601259
msg.contains("Unsupported field: OffsetSeconds") ||
@@ -1489,16 +1488,21 @@ abstract class CSVSuite
14891488
val e = intercept[SparkException] {
14901489
spark.read.csv(inputFile.toURI.toString).collect()
14911490
}
1492-
checkError(
1491+
checkErrorMatchPVals(
14931492
exception = e,
14941493
errorClass = "FAILED_READ_FILE.NO_HINT",
1495-
parameters = Map("path" -> inputFile.toPath.toUri.toString)
1494+
parameters = Map("path" -> s".*${inputFile.getName}.*")
14961495
)
14971496
assert(e.getCause.isInstanceOf[EOFException])
14981497
assert(e.getCause.getMessage === "Unexpected end of input stream")
14991498
val e2 = intercept[SparkException] {
15001499
spark.read.option("multiLine", true).csv(inputFile.toURI.toString).collect()
15011500
}
1501+
checkErrorMatchPVals(
1502+
exception = e2,
1503+
errorClass = "FAILED_READ_FILE.NO_HINT",
1504+
parameters = Map("path" -> s".*${inputFile.getName}.*")
1505+
)
15021506
assert(e2.getCause.getCause.getCause.isInstanceOf[EOFException])
15031507
assert(e2.getCause.getCause.getCause.getMessage === "Unexpected end of input stream")
15041508
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1924,10 +1924,10 @@ abstract class JsonSuite
19241924
val e = intercept[SparkException] {
19251925
spark.read.json(inputFile.toURI.toString).collect()
19261926
}
1927-
checkError(
1927+
checkErrorMatchPVals(
19281928
exception = e,
19291929
errorClass = "FAILED_READ_FILE.NO_HINT",
1930-
parameters = Map("path" -> inputFile.toPath.toUri.toString))
1930+
parameters = Map("path" -> s".*${inputFile.getName}.*"))
19311931
assert(e.getCause.isInstanceOf[EOFException])
19321932
assert(e.getCause.getMessage === "Unexpected end of input stream")
19331933
val e2 = intercept[SparkException] {
@@ -3039,14 +3039,13 @@ abstract class JsonSuite
30393039
val exp = spark.sql("select timestamp_ntz'2020-12-12 12:12:12' as col0")
30403040
for (pattern <- patterns) {
30413041
withTempPath { path =>
3042-
val actualPath = path.toPath.toUri.toURL.toString
30433042
val err = intercept[SparkException] {
30443043
exp.write.option("timestampNTZFormat", pattern).json(path.getAbsolutePath)
30453044
}
30463045
checkErrorMatchPVals(
30473046
exception = err,
30483047
errorClass = "TASK_WRITE_FAILED",
3049-
parameters = Map("path" -> s"$actualPath.*"))
3048+
parameters = Map("path" -> s".*${path.getName}.*"))
30503049

30513050
val msg = err.getCause.getMessage
30523051
assert(

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -266,10 +266,10 @@ class XmlSuite
266266
.xml(inputFile)
267267
.collect()
268268
}
269-
checkError(
269+
checkErrorMatchPVals(
270270
exception = exceptionInParsing,
271271
errorClass = "FAILED_READ_FILE.NO_HINT",
272-
parameters = Map("path" -> Path.of(inputFile).toUri.toString))
272+
parameters = Map("path" -> s".*$inputFile.*"))
273273
checkError(
274274
exception = exceptionInParsing.getCause.asInstanceOf[SparkException],
275275
errorClass = "MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION",
@@ -298,10 +298,10 @@ class XmlSuite
298298
.xml(inputFile)
299299
.show()
300300
}
301-
checkError(
301+
checkErrorMatchPVals(
302302
exception = exceptionInParsing,
303303
errorClass = "FAILED_READ_FILE.NO_HINT",
304-
parameters = Map("path" -> Path.of(inputFile).toUri.toString))
304+
parameters = Map("path" -> s".*$inputFile.*"))
305305
checkError(
306306
exception = exceptionInParsing.getCause.asInstanceOf[SparkException],
307307
errorClass = "MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION",
@@ -2441,15 +2441,14 @@ class XmlSuite
24412441
val exp = spark.sql("select timestamp_ntz'2020-12-12 12:12:12' as col0")
24422442
for (pattern <- patterns) {
24432443
withTempPath { path =>
2444-
val actualPath = path.toPath.toUri.toURL.toString
24452444
val err = intercept[SparkException] {
24462445
exp.write.option("timestampNTZFormat", pattern)
24472446
.option("rowTag", "ROW").xml(path.getAbsolutePath)
24482447
}
24492448
checkErrorMatchPVals(
24502449
exception = err,
24512450
errorClass = "TASK_WRITE_FAILED",
2452-
parameters = Map("path" -> s"$actualPath.*"))
2451+
parameters = Map("path" -> s".*${path.getName}.*"))
24532452
val msg = err.getCause.getMessage
24542453
assert(
24552454
msg.contains("Unsupported field: OffsetSeconds") ||
@@ -2968,11 +2967,10 @@ class XmlSuite
29682967
.mode(SaveMode.Overwrite)
29692968
.xml(path)
29702969
}
2971-
val actualPath = Path.of(dir.getAbsolutePath).toUri.toURL.toString.stripSuffix("/")
29722970
checkErrorMatchPVals(
29732971
exception = e,
29742972
errorClass = "TASK_WRITE_FAILED",
2975-
parameters = Map("path" -> s"$actualPath.*"))
2973+
parameters = Map("path" -> s".*${dir.getName}.*"))
29762974
assert(e.getCause.isInstanceOf[XMLStreamException])
29772975
assert(e.getCause.getMessage.contains(errorMsg))
29782976
}

0 commit comments

Comments
 (0)