Skip to content
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
112ce2d
Checks column names are compatible to provided schema
MaxGekk Mar 20, 2018
a85ccce
Checking header is matched to schema in per-line mode
MaxGekk Mar 20, 2018
75e1534
Extract header and check that it is matched to schema
MaxGekk Mar 20, 2018
8eb45b8
Checking column names in header in multiLine mode
MaxGekk Mar 21, 2018
9b1a986
Adding the checkHeader option with true by default
MaxGekk Mar 21, 2018
6442633
Fix csv test by changing headers or disabling header checking
MaxGekk Mar 21, 2018
9440d8a
Adding comment for the checkHeader option
MaxGekk Mar 21, 2018
9f91ce7
Added comments
MaxGekk Mar 21, 2018
0878f7a
Adding a space between column names
MaxGekk Mar 21, 2018
a341dd7
Fix a test: checking name duplication in schemas
MaxGekk Mar 21, 2018
98c27ea
Fixing the test and adding ticket number to test's title
MaxGekk Mar 23, 2018
811df6f
Refactoring - removing unneeded parameter
MaxGekk Mar 23, 2018
691cfbc
Output filename in the exception
MaxGekk Mar 25, 2018
efb0105
PySpark: adding a test and checkHeader parameter
MaxGekk Mar 25, 2018
c9f5e14
Removing unneeded parameter - fileName
MaxGekk Mar 25, 2018
e195838
Fix for pycodestyle checks
MaxGekk Mar 25, 2018
d6d370d
Adding description of the checkHeader option
MaxGekk Mar 25, 2018
acd6d2e
Improving error messages and handling the case when header size is no…
MaxGekk Mar 26, 2018
13892fd
Refactoring: check header by calling an uniVocity method
MaxGekk Mar 26, 2018
476b517
Refactoring: convert val to def
MaxGekk Mar 26, 2018
f8167e4
Parse header only if the checkHeader option is true
MaxGekk Mar 26, 2018
d068f6c
Moving header checks to CSVDataSource
MaxGekk Mar 26, 2018
08cfcf4
Making uniVocity wrapper unaware of header
MaxGekk Mar 26, 2018
f6a1694
Fix the test: error mesage was changed
MaxGekk Mar 27, 2018
adbedf3
Revert CSV tests as it was before the option was introduced
MaxGekk Mar 31, 2018
0904daf
Renaming checkHeader to enforceSchema
MaxGekk Mar 31, 2018
191b415
Pass required parameter
MaxGekk Apr 1, 2018
718f7ca
Merge branch 'master' of github.com:apache/spark into check-column-names
MaxGekk Apr 5, 2018
75c1ce6
Merge remote-tracking branch 'origin/master' into check-column-names
MaxGekk Apr 13, 2018
ab9c514
Addressing Xiao Li's review comments
MaxGekk Apr 13, 2018
0405863
Making header validation case sensitive
MaxGekk Apr 13, 2018
714c66d
Describing enforceSchema in PySpark's csv method
MaxGekk Apr 13, 2018
78d9f66
Respect to caseSensitive parameter
MaxGekk Apr 13, 2018
b43a7c7
Check header on csv parsing from dataset of strings
MaxGekk Apr 13, 2018
a5f2916
Merge branch 'master' of github.com:apache/spark into check-column-names
MaxGekk Apr 18, 2018
9b2d403
Make Scala style checker happy
MaxGekk Apr 18, 2018
1fffc16
Merge remote-tracking branch 'origin/master' into check-column-names
MaxGekk Apr 27, 2018
ad6cda4
Merge remote-tracking branch 'origin/master' into check-column-names
MaxGekk May 1, 2018
4bdabe2
Merge branch 'master' of github.com:apache/spark into check-column-names
MaxGekk May 4, 2018
2bd2713
Merge branch 'master' into check-column-names
MaxGekk May 14, 2018
b4bfd1d
Merge branch 'check-column-names' of github.com:MaxGekk/spark-1 into …
MaxGekk May 14, 2018
21f8b10
Removing a space to make Scala style checker happy.
MaxGekk May 14, 2018
aca4db9
Merge branch 'master' of github.com:apache/spark into check-column-names
MaxGekk May 16, 2018
e3b4275
Addressing review comments
MaxGekk May 16, 2018
d704766
Removing unnecessary empty checks
MaxGekk May 17, 2018
04199e0
Addressing review comments
MaxGekk May 17, 2018
d5fde52
Merge remote-tracking branch 'origin/master' into check-column-names
MaxGekk May 17, 2018
795a878
Addressing Hyukjin Kwon's review comments
MaxGekk May 17, 2018
05fc7cd
Improving description of the option
MaxGekk May 18, 2018
9606711
Merge remote-tracking branch 'origin/master' into check-column-names
MaxGekk May 18, 2018
11c7591
Addressing Wenchen Fan's review comment
MaxGekk May 18, 2018
7dce1e7
Merge remote-tracking branch 'origin/master' into check-column-names
MaxGekk May 22, 2018
c008328
Output warnings when enforceSchema is enabled and the schema is not c…
MaxGekk May 25, 2018
9f7c440
Added tests for inferSchema is true and enforceSchema is false
MaxGekk May 25, 2018
e83ad60
Rename dropFirstRecord to shouldDropHeader
MaxGekk May 25, 2018
26ae4f9
Merge remote-tracking branch 'origin/master' into check-column-names
MaxGekk May 25, 2018
4b6495b
Merge remote-tracking branch 'origin/master' into check-column-names
MaxGekk Jun 1, 2018
c5ee207
Renaming of 'is not conform' to 'does not conform'
MaxGekk Jun 1, 2018
a2cbb7b
Fix Scala coding style
MaxGekk Jun 1, 2018
70e2b75
Added description of checkHeaderColumnNames's arguments
MaxGekk Jun 1, 2018
e7c3ace
Test checks a warning presents in logs
MaxGekk Jun 1, 2018
3b37712
fix python tests
MaxGekk Jun 1, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None):
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
checkHeader=None):
"""Loads a CSV file and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand All @@ -360,6 +361,9 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
character. By default (None), it is disabled.
:param header: uses the first line as names of columns. If None is set, it uses the
default value, ``false``.
:param checkHeader: compares column names in the header with field names in the schema
and outputs an error if names are not matched.
If None is set, it uses the default value, ``true``.
:param inferSchema: infers the input schema automatically from data. It requires one extra
pass over the data. If None is set, it uses the default value, ``false``.
:param ignoreLeadingWhiteSpace: A flag indicating whether or not leading whitespaces from
Expand Down Expand Up @@ -436,7 +440,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping)
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, checkHeader=checkHeader)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
Expand Down
15 changes: 15 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2974,6 +2974,21 @@ def test_create_dateframe_from_pandas_with_dst(self):
os.environ['TZ'] = orig_env_tz
time.tzset()

def test_checking_csv_header(self):
tmpPath = tempfile.mkdtemp()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try-except

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tmpPath -> tmp_path or path

shutil.rmtree(tmpPath)
self.spark.createDataFrame([[1, 1000], [2000, 2]]).\
toDF('f1', 'f2').write.option("header", "true").csv(tmpPath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> .toDF('f1', 'f2') ...

schema = StructType([
StructField('f2', IntegerType(), nullable=True),
StructField('f1', IntegerType(), nullable=True)])
df = self.spark.read.option('header', 'true').schema(schema).csv(tmpPath)
self.assertRaisesRegexp(
Exception,
"Fields in the header of csv file are not matched to field names of the schema",
lambda: df.collect())
shutil.rmtree(tmpPath)


class HiveSparkSubmitTests(SparkSubmitTests):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`comment` (default empty string): sets a single character used for skipping lines
* beginning with this character. By default, it is disabled.</li>
* <li>`header` (default `false`): uses the first line as names of columns.</li>
* <li>`checkHeader` (default `true`): compares column names in the header with field names
* in the schema and outputs an error if names are not matched.</li>
* <li>`inferSchema` (default `false`): infers the input schema automatically from data. It
* requires one extra pass over the data.</li>
* <li>`ignoreLeadingWhiteSpace` (default `false`): a flag indicating whether or not leading
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ abstract class CSVDataSource extends Serializable {
conf: Configuration,
file: PartitionedFile,
parser: UnivocityParser,
schema: StructType): Iterator[InternalRow]
schema: StructType, // Schema of projection
dataSchema: StructType // Schema of data in csv files
): Iterator[InternalRow]
Copy link
Member

@gatorsmile gatorsmile Apr 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  /**
   * Parse a [[PartitionedFile]] into [[InternalRow]] instances.
   * @param requiredSchema: Schema of projection
   * @param dataSchema: Schema of data in CSV files
   */
  def readFile(
      conf: Configuration,
      file: PartitionedFile,
      parser: UnivocityParser,
      requiredSchema: StructType,
      dataSchema: StructType): Iterator[InternalRow]


/**
* Infers the schema from `inputPaths` files.
Expand Down Expand Up @@ -127,7 +129,8 @@ object TextInputCSVDataSource extends CSVDataSource {
conf: Configuration,
file: PartitionedFile,
parser: UnivocityParser,
schema: StructType): Iterator[InternalRow] = {
schema: StructType,
dataSchema: StructType): Iterator[InternalRow] = {
val lines = {
val linesReader = new HadoopFileLinesReader(file, conf)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close()))
Expand All @@ -136,8 +139,22 @@ object TextInputCSVDataSource extends CSVDataSource {
}
}

val shouldDropHeader = parser.options.headerFlag && file.start == 0
UnivocityParser.parseIterator(lines, shouldDropHeader, parser, schema)
val hasHeader = parser.options.headerFlag && file.start == 0
if (hasHeader) {
// Checking that column names in the header are matched to field names of the schema.
// The header will be removed from lines.
// Note: if there are only comments in the first block, the header would probably
// be not extracted.
val checkHeader = UnivocityParser.checkHeader(
parser,
dataSchema,
_: String,
file.filePath
)
CSVUtils.extractHeader(lines, parser.options).foreach(checkHeader(_))
}

UnivocityParser.parseIterator(lines, parser, schema)
}

override def infer(
Expand Down Expand Up @@ -204,24 +221,35 @@ object MultiLineCSVDataSource extends CSVDataSource {
conf: Configuration,
file: PartitionedFile,
parser: UnivocityParser,
schema: StructType): Iterator[InternalRow] = {
schema: StructType,
dataSchema: StructType): Iterator[InternalRow] = {
val checkHeader = UnivocityParser.checkHeaderColumnNames(
parser,
dataSchema,
_: Array[String],
file.filePath
)
UnivocityParser.parseStream(
CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))),
parser.options.headerFlag,
parser,
schema)
schema,
checkHeader)
}

override def infer(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: CSVOptions): StructType = {
val csv = createBaseRdd(sparkSession, inputPaths, parsedOptions)
// The header is not checked because there is no schema against with it could be check
val checkHeader = (_: Array[String]) => ()
csv.flatMap { lines =>
val path = new Path(lines.getPath())
UnivocityParser.tokenizeStream(
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, path),
shouldDropHeader = false,
checkHeader,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the checkHeader is doing nothing here. Why need this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is defined here because header is an entity of CSV level. Stream tokenizer (in UnivocityParser) works on lower level.

Copy link
Member

@gengliangwang gengliangwang May 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I mean the one in line 302 is doing nothing.

new CsvParser(parsedOptions.asParserSettings))
}.take(1).headOption match {
case Some(firstRow) =>
Expand All @@ -233,6 +261,7 @@ object MultiLineCSVDataSource extends CSVDataSource {
lines.getConfiguration,
new Path(lines.getPath())),
parsedOptions.headerFlag,
checkHeader,
new CsvParser(parsedOptions.asParserSettings))
}
CSVInferSchema.infer(tokenRDD, header, parsedOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
StructType(dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
StructType(requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
parsedOptions)
CSVDataSource(parsedOptions).readFile(conf, file, parser, requiredSchema)
CSVDataSource(parsedOptions).readFile(conf, file, parser, requiredSchema, dataSchema)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ class CSVOptions(

val isCommentSet = this.comment != '\u0000'

/**
* The option enables checks of headers in csv files. In particular, column names
* are matched to field names of provided schema.
*/
val checkHeader = getBool("checkHeader", true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the default is true, will it break the current app? This is a behavior change. We need to document it in the migration guide.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will break apps that declare explicitly schemas with field names different from header's column names like for example in a test of the PR that I modified. I will add info about the option to exception's message and update the migration guide.


def asWriterSettings: CsvWriterSettings = {
val writerSettings = new CsvWriterSettings()
val format = writerSettings.getFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,28 @@ object CSVUtils {
}
}

/**
* Drop header line so that only data can remain.
* This is similar with `filterHeaderLine` above and currently being used in CSV reading path.
*/
def dropHeaderLine(iter: Iterator[String], options: CSVOptions): Iterator[String] = {
val nonEmptyLines = if (options.isCommentSet) {
def skipComments(iter: Iterator[String], options: CSVOptions): Iterator[String] = {
if (options.isCommentSet) {
val commentPrefix = options.comment.toString
iter.dropWhile { line =>
line.trim.isEmpty || line.trim.startsWith(commentPrefix)
}
} else {
iter.dropWhile(_.trim.isEmpty)
}

if (nonEmptyLines.hasNext) nonEmptyLines.drop(1)
iter
}

/**
* Extracts header and moves iterator forward so that only data remains in it
*/
def extractHeader(iter: Iterator[String], options: CSVOptions): Option[String] = {
val nonEmptyLines = skipComments(iter, options)
if (nonEmptyLines.hasNext) {
Some(nonEmptyLines.next())
} else {
None
}
}
/**
* Helper method that converts string representation of a character to actual character.
* It handles some Java escaped strings and throws exception if given string is longer than one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,9 @@ private[csv] object UnivocityParser {
def tokenizeStream(
inputStream: InputStream,
shouldDropHeader: Boolean,
checkHeader: Array[String] => Unit,
tokenizer: CsvParser): Iterator[Array[String]] = {
convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => tokens)
convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader)(tokens => tokens)
}

/**
Expand All @@ -248,26 +249,30 @@ private[csv] object UnivocityParser {
inputStream: InputStream,
shouldDropHeader: Boolean,
parser: UnivocityParser,
schema: StructType): Iterator[InternalRow] = {
schema: StructType,
checkHeader: Array[String] => Unit): Iterator[InternalRow] = {
val tokenizer = parser.tokenizer
val safeParser = new FailureSafeParser[Array[String]](
input => Seq(parser.convert(input)),
parser.options.parseMode,
schema,
parser.options.columnNameOfCorruptRecord)
convertStream(inputStream, shouldDropHeader, tokenizer) { tokens =>
convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { tokens =>
safeParser.parse(tokens)
}.flatten
}

private def convertStream[T](
inputStream: InputStream,
shouldDropHeader: Boolean,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, why did we rename this variable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To show what exactly the parameter controls - dropping the first record in the stream. Responsibility for header manipulation belongs to higher level - CSVDataSource, UnivocityParser doesn't know about header at least for now. Do you believe it should be renamed back?

tokenizer: CsvParser)(convert: Array[String] => T) = new Iterator[T] {
tokenizer: CsvParser,
checkHeader: Array[String] => Unit
)(convert: Array[String] => T) = new Iterator[T] {
tokenizer.beginParsing(inputStream)
private var nextRecord = {
if (shouldDropHeader) {
tokenizer.parseNext()
val header = tokenizer.parseNext()
checkHeader(header)
}
tokenizer.parseNext()
}
Expand All @@ -289,27 +294,52 @@ private[csv] object UnivocityParser {
*/
def parseIterator(
lines: Iterator[String],
shouldDropHeader: Boolean,
parser: UnivocityParser,
schema: StructType): Iterator[InternalRow] = {
val options = parser.options

val linesWithoutHeader = if (shouldDropHeader) {
// Note that if there are only comments in the first block, the header would probably
// be not dropped.
CSVUtils.dropHeaderLine(lines, options)
} else {
lines
}

val filteredLines: Iterator[String] =
CSVUtils.filterCommentAndEmpty(linesWithoutHeader, options)
CSVUtils.filterCommentAndEmpty(lines, options)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, it fits a single line. Please combine line 302 and 303


val safeParser = new FailureSafeParser[String](
input => Seq(parser.parse(input)),
parser.options.parseMode,
schema,
parser.options.columnNameOfCorruptRecord)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd revert it back

filteredLines.flatMap(safeParser.parse)
}

def checkHeaderColumnNames(
parser: UnivocityParser,
schema: StructType,
columnNames: Array[String],
fileName: String
): Unit = {
if (parser.options.checkHeader && columnNames != null) {
val fieldNames = schema.map(_.name)
val isMatched = fieldNames.zip(columnNames).forall { pair =>
val (nameInSchema, nameInHeader) = pair
nameInSchema == nameInHeader
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we care the case sensitivity here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not declare case sensitivity of CSV inputs in our docs. Also I have not found explicit statement in csv descriptions about case sensitivity. It seems it is up to implementations how to handle such cases. For example, Apache Commons allow to configure the behavior: https://commons.apache.org/proper/commons-csv/apidocs/index.html .

}
if (!isMatched) {
throw new IllegalArgumentException(
s"""|Fields in the header of csv file are not matched to field names of the schema:
| Header: ${columnNames.mkString(", ")}
| Schema: ${fieldNames.mkString(", ")}
|CSV file: $fileName""".stripMargin
)
}
}
}

def checkHeader(
parser: UnivocityParser,
schema: StructType,
header: String,
fileName: String
): Unit = {
lazy val columnNames = parser.tokenizer.parseLine(header)
checkHeaderColumnNames(parser, schema, columnNames, fileName)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
|(yearMade double, makeName string, modelName string, priceTag decimal,
| comments string, grp string)
|USING csv
|OPTIONS (path "${testFile(carsTsvFile)}", header "true", delimiter "\t")
|OPTIONS (
| path "${testFile(carsTsvFile)}",
| header "true", checkHeader "false",
| delimiter "\t"
|)
""".stripMargin.replaceAll("\n", " "))

assert(
Expand All @@ -275,7 +279,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
test("test for blank column names on read and select columns") {
val cars = spark.read
.format("csv")
.options(Map("header" -> "true", "inferSchema" -> "true"))
.options(Map("header" -> "true", "checkHeader" -> "false", "inferSchema" -> "true"))
.load(testFile(carsBlankColName))

assert(cars.select("customer").collect().size == 2)
Expand Down Expand Up @@ -348,15 +352,15 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
spark.sql(
s"""
|CREATE TEMPORARY VIEW carsTable
|(yearMade double, makeName string, modelName string, comments string, blank string)
|(year double, make string, model string, comment string, blank string)
|USING csv
|OPTIONS (path "${testFile(carsFile)}", header "true")
""".stripMargin.replaceAll("\n", " "))

val cars = spark.table("carsTable")
verifyCars(cars, withHeader = true, checkHeader = false, checkValues = false)
assert(
cars.schema.fieldNames === Array("yearMade", "makeName", "modelName", "comments", "blank"))
cars.schema.fieldNames === Array("year", "make", "model", "comment", "blank"))
}
}

Expand Down Expand Up @@ -1279,4 +1283,30 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
Row("0,2013-111-11 12:13:14") :: Row(null) :: Nil
)
}

def checkHeader(multiLine: String): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also test it when setting header to true and false?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

test(s"SPARK-23786: Checking column names against schema ($multiLine)") {
Copy link
Member

@gatorsmile gatorsmile Apr 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below is preferred, since it is easier for us to run the test in IntelliJ

test("SPARK-23786: Checking column names against schema (multiline)") {
  checkHeader(multiLine = true)
}

withTempPath { path =>
import collection.JavaConverters._
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd put this on the top

val oschema = new StructType().add("f1", DoubleType).add("f2", DoubleType)
val odf = spark.createDataFrame(List(Row(1.0, 1234.5)).asJava, oschema)
odf.write.option("header", "true").csv(path.getCanonicalPath)
val ischema = new StructType().add("f2", DoubleType).add("f1", DoubleType)
val exception = intercept[SparkException] {
spark.read
.schema(ischema)
.option("multiLine", multiLine)
.option("header", "true")
.option("checkHeader", "true")
.csv(path.getCanonicalPath)
.collect()
}
assert(exception.getMessage.contains(
"Fields in the header of csv file are not matched to field names of the schema"
))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit: shell we make this inlined if we need more other changes?

}
}
}

List("false", "true").foreach(checkHeader(_))
}
Loading