-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSV #16854
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Let me try to fix comments more and double check tomorrow. |
|
Test build #72588 has finished for PR 16854 at commit
|
eabb3f3 to
a7e8c2b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to help review, there is a similar code path in
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
Lines 105 to 125 in 3d314d0
| val lines = { | |
| val conf = broadcastedHadoopConf.value.value | |
| val linesReader = new HadoopFileLinesReader(file, conf) | |
| Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close())) | |
| linesReader.map { line => | |
| new String(line.getBytes, 0, line.getLength, csvOptions.charset) | |
| } | |
| } | |
| val linesWithoutHeader = if (csvOptions.headerFlag && file.start == 0) { | |
| // Note that if there are only comments in the first block, the header would probably | |
| // be not dropped. | |
| CSVUtils.dropHeaderLine(lines, csvOptions) | |
| } else { | |
| lines | |
| } | |
| val filteredLines = CSVUtils.filterCommentAndEmpty(linesWithoutHeader, csvOptions) | |
| val parser = new UnivocityParser(dataSchema, requiredSchema, csvOptions) | |
| filteredLines.flatMap(parser.parse) | |
| } |
|
Test build #72623 has finished for PR 16854 at commit
|
|
Cc @cloud-fan, do you mind if I ask whether you think it is worth adding this API? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we also add def json(lines: Dataset[String])? and deprecate json(r: RDD[String]) and json(r: JavaRDD[String])
cc @rxin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Actually, there is a JIRA and closed PR, #13460 and SPARK-15615 where I was negative because it can be easily worked around.
However, I am fine if we are promoting to use datasets instead of RDDs for some advantages like SPARK-18362 (if applicable).
cc @pjfanning, could you reopen and proceed your PR if we are all fine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon I can look at resurrecting the pull request for SPARK-15615
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon I added a new pull request because my original branch was deleted. #16895
|
Let me update this after #16976 gets merged as that changes the related code path rapidly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @cloud-fan, this is still a wip but I am trying to put the different execution paths into here in CSV parsing.
For example,
-
spark.read.csv(file)- data:
parseIterator(note that this one is read from partitioned file). - schema:
tokenizeDataset
- data:
-
spark.read.csv(file)withwholeFile- data:
parseStream - schema:
tokenizeStream
- data:
-
spark.read.csv(dataset)- data:
parseDataset - schema:
tokenizeDataset
- data:
However, it seems ending up with a bit weird arguments here.. do you think it is okay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you are not sure or it looks painful to review, let me take out all the changes and put those into DataFrameReader.csv for now. Otherwise, I will take a look further and see if I can maybe generalise these more rather than just putting together.
|
Test build #73756 has finished for PR 16854 at commit
|
|
does |
|
Oh, no. It does not need to. I just meant to de-duplicate some logics by #16854 (comment). Let me just remove that part and leave only code changes dedicated for this JIRA. It seems making reviewers confused. Let me clean up soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why this one exists unlike json is, CSV needs to head a read always first (even if it does not infer the schema, it needs at least the number of values). In this case, we could return empty one fast.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a similar code path in
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
Lines 132 to 150 in 7e5359b
| override def infer( | |
| sparkSession: SparkSession, | |
| inputPaths: Seq[FileStatus], | |
| parsedOptions: CSVOptions): Option[StructType] = { | |
| val csv: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions) | |
| val firstLine: String = CSVUtils.filterCommentAndEmpty(csv, parsedOptions).first() | |
| val firstRow = new CsvParser(parsedOptions.asParserSettings).parseLine(firstLine) | |
| val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis | |
| val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions) | |
| val tokenRDD = csv.rdd.mapPartitions { iter => | |
| val filteredLines = CSVUtils.filterCommentAndEmpty(iter, parsedOptions) | |
| val linesWithoutHeader = | |
| CSVUtils.filterHeaderLine(filteredLines, firstLine, parsedOptions) | |
| val parser = new CsvParser(parsedOptions.asParserSettings) | |
| linesWithoutHeader.map(parser.parseLine) | |
| } | |
| Some(CSVInferSchema.infer(tokenRDD, header, parsedOptions)) | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then can we just call TextInputCSVDataSource.infer here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, sorry, I overlooked. It seems TextInputCSVDataSource.infer takes input paths whereas we want Dataset here. Let me try to take a look and see if we could reuse it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, this is too much. I am willing to revert this back.
|
Let me double check before getting rid of |
|
Test build #73902 has finished for PR 16854 at commit
|
|
Test build #73903 has finished for PR 16854 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makeSafeHeader was moved from CSVDataSource class to CSVDataSource companion object so that this can be accessed in DataFrameReader.
|
Test build #73927 has finished for PR 16854 at commit
|
|
Test build #73928 has finished for PR 16854 at commit
|
|
@cloud-fan, I think this is ready for another look. |
|
Test build #74017 has finished for PR 16854 at commit
|
|
Test build #74011 has finished for PR 16854 at commit
|
| /** | ||
| * Infers the schema from `Dataset` that stores CSV string records. | ||
| */ | ||
| def inferFromDataset( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is almost no code modification here. Just moved from above.
|
Test build #74092 has finished for PR 16854 at commit
|
|
Test build #74098 has finished for PR 16854 at commit
|
|
Test build #74101 has finished for PR 16854 at commit
|
|
Test build #74102 has finished for PR 16854 at commit
|
|
|
||
| test("Empty dataframe produces empty dataframe") { | ||
| // Empty dataframe with schema. | ||
| val emptyDF = spark.createDataFrame( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we create emptyDF? looks like we only need a schema here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, let me fix it up.
|
Test build #74200 has finished for PR 16854 at commit
|
|
thanks, merging to master! |
| CSVUtils.filterCommentAndEmpty(csvDataset, parsedOptions) | ||
| val maybeFirstLine: Option[String] = filteredLines.take(1).headOption | ||
|
|
||
| val schema = userSpecifiedSchema.getOrElse { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should issue an error when users try to parse it as a wholeFile.
Need to check whether all the other CSV options are still accepted by this API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile, Yes, we need to check. For JSON API too. Though, should we throws an error? It reminds me of parse modes in from_json/to_json that ignore parse modes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not simply ignore the options without error messages. The options are not like hints.
What changes were proposed in this pull request?
This PR proposes to add an API that loads
DataFramefromDataset[String]storing csv.It allows pre-processing before loading into CSV, which means allowing a lot of workarounds for many narrow cases, for example, as below:
Case 1 - pre-processing
Case 2 - use other input formats
How was this patch tested?
Added tests in
CSVSuiteand build with Scala 2.10.