Skip to content

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Oct 9, 2018

What changes were proposed in this pull request?

  1. Move CSVDataSource.makeSafeHeader to CSVUtils.makeSafeHeader (as is).

    • Historically and at the first place of refactoring (which I did), I intended to put all CSV specific handling (like options), filtering, extracting header, etc.

    • See JsonDataSource. Now CSVDataSource is quite consistent with JsonDataSource. Since CSV's code path is quite complicated, we might better match them as possible as we can.

  2. Create CSVHeaderChecker and put enforceSchema logics into that.

  3. Move CSVDataSource.checkHeaderColumnNames to CSVHeaderChecker.checkHeaderColumnNames (as is).

    • Similar reasons above with 1.

How was this patch tested?

Existing tests should cover this.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Oct 9, 2018

cc @cloud-fan since you're looking at JSON related codes, and reviewed some of related PRs and @MaxGekk since you're looking into this area.

tokenizer: CsvParser): Iterator[Array[String]] = {
convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => tokens)
val handleHeader: () => Unit =
() => if (shouldDropHeader) tokenizer.parseNext
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in schema inference path, where we don't check header. Here only it drops the header.

convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { tokens =>

val handleHeader: () => Unit =
() => headerChecker.checkHeaderColumnNames(tokenizer)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches the code structure with parseStream and parseIterator which are used in multimode and non-multimode.

/**
* Generates a header from the given row which is null-safe and duplicate-safe.
*/
def makeSafeHeader(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's moved as was.

*
* @param columnNames names of CSV columns that must be checked against to the schema.
*/
private def checkHeaderColumnNames(columnNames: Array[String]): Unit = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's moved as was except the parameters at its signature.

| Header: ${columnNames.mkString(", ")}
| Schema: ${fieldNames.mkString(", ")}
|Expected: ${fieldNames(i)} but found: ${columnNames(i)}
|$source""".stripMargin)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only this diff.

Previously it was

 |CSV file: $fileName""".stripMargin)

which ends up with producing the class of source here. See (https://github.com/apache/spark/pull/22676/files#diff-f70bda59304588cc3abfa3a9840653f4R512)

This is only the diff in this method.

@SparkQA
Copy link

SparkQA commented Oct 9, 2018

Test build #97149 has finished for PR 22676 at commit 89f7911.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 9, 2018

Test build #97147 has finished for PR 22676 at commit 5690668.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class CSVHeaderChecker(

* if unknown or not applicable (for instance when the input is a dataset),
* can be omitted.
*/
class CSVHeaderChecker(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be private to csv or spark packages? or is this now part of a public API?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's under execution package which is meant to be private. Since it's accessed in DataFrameReader, it should be private[sql] which is removed in SPARK-16964 for this reason.

Copy link
Member

@MaxGekk MaxGekk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely it looks better.

parsedOptions.enforceSchema,
sparkSession.sessionState.conf.caseSensitiveAnalysis)
parsedOptions,
source = s"CSV source: ${csvDataset.getClass.getCanonicalName}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to output more concrete info about the dataset. For example, toString outputs field names at least. I think it will help in log analysis.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. If that's just toString, of course I can fix it here since the change is small although it's orthogonal.

} else {
filteredLines.rdd
}
}.getOrElse(filteredLines.rdd)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not directly related to your changes. Just in case, why do we convert Dataset to RDD here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't exactly remember. Looks we can change it to Dataset.

* if unknown or not applicable (for instance when the input is a dataset),
* can be omitted.
*/
class CSVHeaderChecker(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this prefix of CSVHeaderChecker necessary? The class is in csv package already. It should be clear that it checks CSV headers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave as is. It's kind of existing naming convention within each datasource.

parsedOptions)
val schema = if (columnPruning) requiredSchema else dataSchema
val headerChecker = new CSVHeaderChecker(
schema, parsedOptions, source = s"CSV file: ${file.filePath}", file.start == 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isStartOfFile = file.start == 0

}

// We can handle header here since here the stream is open.
handleHeader()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks slightly strange that we consume data from the input before the upper layer starts reading it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is but I guess it was already doing in this way.

parser: UnivocityParser,
headerChecker: CSVHeaderChecker,
schema: StructType): Iterator[InternalRow] = {
headerChecker.checkHeaderColumnNames(lines, parser.tokenizer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same question here. I would prefer to consume the input iterator lazily. This is the one of advantage of iterators , it performs an action when you explicitly call it (hasNext or next) comparing to collections, for example.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. It was already doing in this way. Let's keep the original path as is since it targets to organize it..

case Some(firstRow) if firstRow != null =>
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive, parsedOptions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about to import it from CSVUtils? What is the reason to have the prefix here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because mostly in this codes use CSVUtils... one. I just followed it.

@SparkQA
Copy link

SparkQA commented Oct 9, 2018

Test build #97162 has finished for PR 22676 at commit c504356.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Oct 11, 2018

Test build #97235 has finished for PR 22676 at commit c504356.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Oct 11, 2018

Test build #97240 has finished for PR 22676 at commit c504356.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

Merged to master.

@HyukjinKwon
Copy link
Member Author

Thank you @cloud-fan and @MaxGekk for reviewing this.

@asfgit asfgit closed this in 39872af Oct 12, 2018
@HyukjinKwon HyukjinKwon deleted the refactoring-csv branch October 16, 2018 12:41
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

1. Move `CSVDataSource.makeSafeHeader` to `CSVUtils.makeSafeHeader` (as is).

    - Historically and at the first place of refactoring (which I did), I intended to put all CSV specific handling (like options), filtering, extracting header, etc.

    - See `JsonDataSource`. Now `CSVDataSource` is quite consistent with `JsonDataSource`. Since CSV's code path is quite complicated, we might better match them as possible as we can.

2. Create `CSVHeaderChecker` and put `enforceSchema` logics into that.

    - The checking header and column pruning stuff were added (per apache#20894 and apache#21296) but some of codes such as apache#22123 are duplicated

    - Also, checking header code is basically here and there. We better put them in a single place, which was quite error-prone. See (apache#22656).

3. Move `CSVDataSource.checkHeaderColumnNames` to `CSVHeaderChecker.checkHeaderColumnNames` (as is).

    - Similar reasons above with 1.

## How was this patch tested?

Existing tests should cover this.

Closes apache#22676 from HyukjinKwon/refactoring-csv.

Authored-by: hyukjinkwon <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants