-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-25684][SQL] Organize header related codes in CSV datasource #22676
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @cloud-fan since you're looking at JSON related codes, and reviewed some of related PRs and @MaxGekk since you're looking into this area. |
tokenizer: CsvParser): Iterator[Array[String]] = { | ||
convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => tokens) | ||
val handleHeader: () => Unit = | ||
() => if (shouldDropHeader) tokenizer.parseNext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used in schema inference path, where we don't check header. Here only it drops the header.
convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { tokens => | ||
|
||
val handleHeader: () => Unit = | ||
() => headerChecker.checkHeaderColumnNames(tokenizer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This matches the code structure with parseStream
and parseIterator
which are used in multimode and non-multimode.
/** | ||
* Generates a header from the given row which is null-safe and duplicate-safe. | ||
*/ | ||
def makeSafeHeader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's moved as was.
* | ||
* @param columnNames names of CSV columns that must be checked against to the schema. | ||
*/ | ||
private def checkHeaderColumnNames(columnNames: Array[String]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's moved as was except the parameters at its signature.
| Header: ${columnNames.mkString(", ")} | ||
| Schema: ${fieldNames.mkString(", ")} | ||
|Expected: ${fieldNames(i)} but found: ${columnNames(i)} | ||
|$source""".stripMargin) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only this diff.
Previously it was
|CSV file: $fileName""".stripMargin)
which ends up with producing the class of source here. See (https://github.com/apache/spark/pull/22676/files#diff-f70bda59304588cc3abfa3a9840653f4R512)
This is only the diff in this method.
Test build #97149 has finished for PR 22676 at commit
|
Test build #97147 has finished for PR 22676 at commit
|
* if unknown or not applicable (for instance when the input is a dataset), | ||
* can be omitted. | ||
*/ | ||
class CSVHeaderChecker( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be private to csv or spark packages? or is this now part of a public API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's under execution package which is meant to be private. Since it's accessed in DataFrameReader, it should be private[sql]
which is removed in SPARK-16964 for this reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely it looks better.
parsedOptions.enforceSchema, | ||
sparkSession.sessionState.conf.caseSensitiveAnalysis) | ||
parsedOptions, | ||
source = s"CSV source: ${csvDataset.getClass.getCanonicalName}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better to output more concrete info about the dataset. For example, toString
outputs field names at least. I think it will help in log analysis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. If that's just toString
, of course I can fix it here since the change is small although it's orthogonal.
} else { | ||
filteredLines.rdd | ||
} | ||
}.getOrElse(filteredLines.rdd) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not directly related to your changes. Just in case, why do we convert Dataset
to RDD
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't exactly remember. Looks we can change it to Dataset
.
* if unknown or not applicable (for instance when the input is a dataset), | ||
* can be omitted. | ||
*/ | ||
class CSVHeaderChecker( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this prefix of CSVHeaderChecker
necessary? The class is in csv
package already. It should be clear that it checks CSV headers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's leave as is. It's kind of existing naming convention within each datasource.
parsedOptions) | ||
val schema = if (columnPruning) requiredSchema else dataSchema | ||
val headerChecker = new CSVHeaderChecker( | ||
schema, parsedOptions, source = s"CSV file: ${file.filePath}", file.start == 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isStartOfFile = file.start == 0
} | ||
|
||
// We can handle header here since here the stream is open. | ||
handleHeader() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks slightly strange that we consume data from the input before the upper layer starts reading it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is but I guess it was already doing in this way.
parser: UnivocityParser, | ||
headerChecker: CSVHeaderChecker, | ||
schema: StructType): Iterator[InternalRow] = { | ||
headerChecker.checkHeaderColumnNames(lines, parser.tokenizer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same question here. I would prefer to consume the input iterator lazily. This is the one of advantage of iterators , it performs an action when you explicitly call it (hasNext
or next
) comparing to collections, for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto. It was already doing in this way. Let's keep the original path as is since it targets to organize it..
case Some(firstRow) if firstRow != null => | ||
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis | ||
val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions) | ||
val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive, parsedOptions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about to import it from CSVUtils
? What is the reason to have the prefix here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because mostly in this codes use CSVUtils...
one. I just followed it.
Test build #97162 has finished for PR 22676 at commit
|
retest this please |
Test build #97235 has finished for PR 22676 at commit
|
retest this please |
Test build #97240 has finished for PR 22676 at commit
|
Merged to master. |
Thank you @cloud-fan and @MaxGekk for reviewing this. |
## What changes were proposed in this pull request? 1. Move `CSVDataSource.makeSafeHeader` to `CSVUtils.makeSafeHeader` (as is). - Historically and at the first place of refactoring (which I did), I intended to put all CSV specific handling (like options), filtering, extracting header, etc. - See `JsonDataSource`. Now `CSVDataSource` is quite consistent with `JsonDataSource`. Since CSV's code path is quite complicated, we might better match them as possible as we can. 2. Create `CSVHeaderChecker` and put `enforceSchema` logics into that. - The checking header and column pruning stuff were added (per apache#20894 and apache#21296) but some of codes such as apache#22123 are duplicated - Also, checking header code is basically here and there. We better put them in a single place, which was quite error-prone. See (apache#22656). 3. Move `CSVDataSource.checkHeaderColumnNames` to `CSVHeaderChecker.checkHeaderColumnNames` (as is). - Similar reasons above with 1. ## How was this patch tested? Existing tests should cover this. Closes apache#22676 from HyukjinKwon/refactoring-csv. Authored-by: hyukjinkwon <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
What changes were proposed in this pull request?
Move
CSVDataSource.makeSafeHeader
toCSVUtils.makeSafeHeader
(as is).Historically and at the first place of refactoring (which I did), I intended to put all CSV specific handling (like options), filtering, extracting header, etc.
See
JsonDataSource
. NowCSVDataSource
is quite consistent withJsonDataSource
. Since CSV's code path is quite complicated, we might better match them as possible as we can.Create
CSVHeaderChecker
and putenforceSchema
logics into that.The checking header and column pruning stuff were added (per [SPARK-23786][SQL] Checking column names of csv headers #20894 and [SPARK-24244][SQL] Passing only required columns to the CSV parser #21296) but some of codes such as [SPARK-25134][SQL] Csv column pruning with checking of headers throws incorrect error #22123 are duplicated
Also, checking header code is basically here and there. We better put them in a single place, which was quite error-prone. See ([SPARK-25669][SQL] Check CSV header only when it exists #22656).
Move
CSVDataSource.checkHeaderColumnNames
toCSVHeaderChecker.checkHeaderColumnNames
(as is).How was this patch tested?
Existing tests should cover this.