Skip to content

Conversation

@HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Feb 8, 2017

What changes were proposed in this pull request?

This PR proposes to add an API that loads DataFrame from Dataset[String] storing csv.

It allows pre-processing before loading into CSV, which means allowing a lot of workarounds for many narrow cases, for example, as below:

  • Case 1 - pre-processing

    val df = spark.read.text("...")
    // Pre-processing with this.
    spark.read.csv(df.as[String])
  • Case 2 - use other input formats

    val rdd = spark.sparkContext.newAPIHadoopFile("/file.csv.lzo",
      classOf[com.hadoop.mapreduce.LzoTextInputFormat],
      classOf[org.apache.hadoop.io.LongWritable],
      classOf[org.apache.hadoop.io.Text])
    val stringRdd = rdd.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength))
    
    spark.read.csv(stringRdd.toDS)

How was this patch tested?

Added tests in CSVSuite and build with Scala 2.10.

./dev/change-scala-version.sh 2.10
./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package

@HyukjinKwon HyukjinKwon changed the title [SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] [WIP][SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] Feb 8, 2017
@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Feb 8, 2017

Let me try to fix comments more and double check tomorrow.

@SparkQA
Copy link

SparkQA commented Feb 8, 2017

Test build #72588 has finished for PR 16854 at commit eabb3f3.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class UnivocityParser(

@HyukjinKwon HyukjinKwon changed the title [WIP][SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] [WIP][SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSV Feb 8, 2017
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to help review, there is a similar code path in

val lines = {
val conf = broadcastedHadoopConf.value.value
val linesReader = new HadoopFileLinesReader(file, conf)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close()))
linesReader.map { line =>
new String(line.getBytes, 0, line.getLength, csvOptions.charset)
}
}
val linesWithoutHeader = if (csvOptions.headerFlag && file.start == 0) {
// Note that if there are only comments in the first block, the header would probably
// be not dropped.
CSVUtils.dropHeaderLine(lines, csvOptions)
} else {
lines
}
val filteredLines = CSVUtils.filterCommentAndEmpty(linesWithoutHeader, csvOptions)
val parser = new UnivocityParser(dataSchema, requiredSchema, csvOptions)
filteredLines.flatMap(parser.parse)
}

@HyukjinKwon HyukjinKwon changed the title [WIP][SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSV [SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSV Feb 9, 2017
@SparkQA
Copy link

SparkQA commented Feb 9, 2017

Test build #72623 has finished for PR 16854 at commit a7e8c2b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class UnivocityParser(

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Feb 9, 2017

Cc @cloud-fan, do you mind if I ask whether you think it is worth adding this API?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we also add def json(lines: Dataset[String])? and deprecate json(r: RDD[String]) and json(r: JavaRDD[String])

cc @rxin

Copy link
Member Author

@HyukjinKwon HyukjinKwon Feb 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Actually, there is a JIRA and closed PR, #13460 and SPARK-15615 where I was negative because it can be easily worked around.

However, I am fine if we are promoting to use datasets instead of RDDs for some advantages like SPARK-18362 (if applicable).

cc @pjfanning, could you reopen and proceed your PR if we are all fine?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I can look at resurrecting the pull request for SPARK-15615

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I added a new pull request because my original branch was deleted. #16895

@HyukjinKwon
Copy link
Member Author

Let me update this after #16976 gets merged as that changes the related code path rapidly.

@HyukjinKwon HyukjinKwon changed the title [SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSV [WIP][SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSV Mar 2, 2017
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @cloud-fan, this is still a wip but I am trying to put the different execution paths into here in CSV parsing.

For example,

  • spark.read.csv(file)

    • data: parseIterator (note that this one is read from partitioned file).
    • schema: tokenizeDataset
  • spark.read.csv(file) with wholeFile

    • data: parseStream
    • schema: tokenizeStream
  • spark.read.csv(dataset)

    • data: parseDataset
    • schema: tokenizeDataset

However, it seems ending up with a bit weird arguments here.. do you think it is okay?

Copy link
Member Author

@HyukjinKwon HyukjinKwon Mar 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are not sure or it looks painful to review, let me take out all the changes and put those into DataFrameReader.csv for now. Otherwise, I will take a look further and see if I can maybe generalise these more rather than just putting together.

@SparkQA
Copy link

SparkQA commented Mar 2, 2017

Test build #73756 has finished for PR 16854 at commit de492b3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait CSVDataSource extends Serializable

@cloud-fan
Copy link
Contributor

does def csv(csvDataset: Dataset[String]) need to support whole file? I think the JSON one doesn't support it either.

@HyukjinKwon
Copy link
Member Author

Oh, no. It does not need to.

I just meant to de-duplicate some logics by #16854 (comment). Let me just remove that part and leave only code changes dedicated for this JIRA. It seems making reviewers confused. Let me clean up soon.

Copy link
Member Author

@HyukjinKwon HyukjinKwon Mar 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why this one exists unlike json is, CSV needs to head a read always first (even if it does not infer the schema, it needs at least the number of values). In this case, we could return empty one fast.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a similar code path in

override def infer(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: CSVOptions): Option[StructType] = {
val csv: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions)
val firstLine: String = CSVUtils.filterCommentAndEmpty(csv, parsedOptions).first()
val firstRow = new CsvParser(parsedOptions.asParserSettings).parseLine(firstLine)
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
val tokenRDD = csv.rdd.mapPartitions { iter =>
val filteredLines = CSVUtils.filterCommentAndEmpty(iter, parsedOptions)
val linesWithoutHeader =
CSVUtils.filterHeaderLine(filteredLines, firstLine, parsedOptions)
val parser = new CsvParser(parsedOptions.asParserSettings)
linesWithoutHeader.map(parser.parseLine)
}
Some(CSVInferSchema.infer(tokenRDD, header, parsedOptions))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then can we just call TextInputCSVDataSource.infer here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry, I overlooked. It seems TextInputCSVDataSource.infer takes input paths whereas we want Dataset here. Let me try to take a look and see if we could reuse it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, this is too much. I am willing to revert this back.

@HyukjinKwon
Copy link
Member Author

Let me double check before getting rid of [WIP] tomorrow.

@SparkQA
Copy link

SparkQA commented Mar 4, 2017

Test build #73902 has finished for PR 16854 at commit 859113a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class CSVOptions(
  • class UnivocityParser(

@SparkQA
Copy link

SparkQA commented Mar 4, 2017

Test build #73903 has finished for PR 16854 at commit b806698.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makeSafeHeader was moved from CSVDataSource class to CSVDataSource companion object so that this can be accessed in DataFrameReader.

@HyukjinKwon HyukjinKwon changed the title [WIP][SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSV [SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSV Mar 5, 2017
@SparkQA
Copy link

SparkQA commented Mar 5, 2017

Test build #73927 has finished for PR 16854 at commit aed003e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 5, 2017

Test build #73928 has finished for PR 16854 at commit de08313.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class CSVDataSource extends Serializable

@HyukjinKwon HyukjinKwon changed the title [SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSV [WIP][SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSV Mar 6, 2017
@HyukjinKwon HyukjinKwon changed the title [WIP][SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSV [SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSV Mar 6, 2017
@HyukjinKwon
Copy link
Member Author

@cloud-fan, I think this is ready for another look.

@SparkQA
Copy link

SparkQA commented Mar 6, 2017

Test build #74017 has finished for PR 16854 at commit af9bc6f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 6, 2017

Test build #74011 has finished for PR 16854 at commit 3a0401a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

/**
* Infers the schema from `Dataset` that stores CSV string records.
*/
def inferFromDataset(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is almost no code modification here. Just moved from above.

@SparkQA
Copy link

SparkQA commented Mar 7, 2017

Test build #74092 has finished for PR 16854 at commit 92dfdf9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 7, 2017

Test build #74098 has finished for PR 16854 at commit a2739fd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 7, 2017

Test build #74101 has finished for PR 16854 at commit a14df70.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 7, 2017

Test build #74102 has finished for PR 16854 at commit a0a79dc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


test("Empty dataframe produces empty dataframe") {
// Empty dataframe with schema.
val emptyDF = spark.createDataFrame(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we create emptyDF? looks like we only need a schema here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me fix it up.

@SparkQA
Copy link

SparkQA commented Mar 8, 2017

Test build #74200 has finished for PR 16854 at commit 3f42c4c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 4551290 Mar 8, 2017
CSVUtils.filterCommentAndEmpty(csvDataset, parsedOptions)
val maybeFirstLine: Option[String] = filteredLines.take(1).headOption

val schema = userSpecifiedSchema.getOrElse {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should issue an error when users try to parse it as a wholeFile.

Need to check whether all the other CSV options are still accepted by this API.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile, Yes, we need to check. For JSON API too. Though, should we throws an error? It reminds me of parse modes in from_json/to_json that ignore parse modes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not simply ignore the options without error messages. The options are not like hints.

@HyukjinKwon HyukjinKwon deleted the SPARK-15463 branch January 2, 2018 03:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants