Skip to content

Conversation

@HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Feb 17, 2017

What changes were proposed in this pull request?

This PR proposes the support for multiple lines for CSV by resembling the multiline supports in JSON datasource (in case of JSON, per file).

So, this PR introduces wholeFile option which makes the format not splittable and reads each whole file. Since Univocity parser can produces each row from a stream, it should be capable of parsing very large documents when the internal rows are fix in the memory.

How was this patch tested?

Unit tests in CSVSuite and tests.py

Manual tests with a single 9GB CSV file in local file system, for example,

spark.read.option("wholeFile", true).option("inferSchema", true).csv("tmp.csv").count()

@HyukjinKwon
Copy link
Member Author

(I should clean it up more and take a look into this deeper. I left open this for running tests.)

@SparkQA
Copy link

SparkQA commented Feb 17, 2017

Test build #73062 has finished for PR 16976 at commit 3a303c7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class CSVDataSource extends Serializable

@SparkQA
Copy link

SparkQA commented Feb 17, 2017

Test build #73063 has finished for PR 16976 at commit 8275d12.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class CSVDataSource extends Serializable

@HyukjinKwon HyukjinKwon changed the title [WIP][SPARK-19610][SQL] Support parsing multiline CSV files [SPARK-19610][SQL] Support parsing multiline CSV files Feb 18, 2017
@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Feb 18, 2017

cc @NathanHowell, this basically resembles your JSON approach. Could you take a look and see if it makes sense please?

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Feb 18, 2017

cc @cloud-fan and @rxin too.

@SparkQA
Copy link

SparkQA commented Feb 18, 2017

Test build #73088 has finished for PR 16976 at commit 44c9465.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 18, 2017

Test build #73089 has finished for PR 16976 at commit 373eec9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 18, 2017

Test build #73105 has finished for PR 16976 at commit 2ddbf15.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@felixcheung
Copy link
Member

hmm, I understand the motivation for this, though my understanding with csv generally either avoid having newline in field or some implementation would require quotes around field value with newline

https://en.wikipedia.org/wiki/Comma-separated_values#General_functionality

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Feb 19, 2017

I initially left the similar comment in the same JIRAs. However, that there are a quite bit of similar JIRAs complaining about this and the original CSV datasource tried to support this although that was incorrectly implemented. This tries to match it with JSON one at least and it might be better to provide a way to process such CSV files. Actually, current implementation requires quotes :). (It was told R supports this case too actually).

@felixcheung
Copy link
Member

I see, fair enough. cc @falaki

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks pretty good, left some minor comments

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow you are only 25?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was.. in the last year :).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, just copied and pasted. Just checked that they are same line by line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd prefer xxx.flatMap(line => parser.parse(line))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's safer to add a check

if (!hasNext) {
  throw new NoSuchElementException("End of stream")
}

@HyukjinKwon
Copy link
Member Author

Per 4dc48e1, I ran a build with Scala 2.10.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the normal code path has a more complex way to drop the header, is it ok to simply drop the first parsed record here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, existing code path processes line by line so comments and empty lines had to manually skipped because it threw an error if each line has only commented line or empty. This is nicely handled in Univocity and the initial version of CSV actually did this in this way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just check the schema and result of df?

assert(df.schema.map(_.name) == Seq("string\"", "integer\n\n", ...))
checkAnswer(df, Row("this is a\nsimple\nstring", ...) :: Row ...)

writing df out and reading it back and comparing the result doesn't mean df has corrected result.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be csv.flatMap { lines => ...?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have so far promoted the usage of mapPartitions and iterator for a better performance (not checked by myself though). At least, these are being used more commonly up to my knowledge and it makes it consistent. I would like to keep this way if you are fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it has better performance if you use mapPartitionsInternal, otherwise these 2 are exactly same...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thank you. I didn't know.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work on 0 byte files?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me check and add a test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the original one is not working too. Can I maybe open another JIRA and PR both together (or a follow-up)?

scala> spark.read.csv("a")
java.util.NoSuchElementException: next on empty iterator
	at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
	at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
	at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.s

It seems we need a bit of fix including other CSV paths.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another JIRA is a good idea for the line based parser, it could be fixed in this PR or seperately. WholeFileCSVDataSource should be fixed in this PR before merging, since it's new functionality.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me try.

Copy link
Member Author

@HyukjinKwon HyukjinKwon Feb 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be nice to move this over to CodecStreams (with an appropriate name) and fix up other classes that use this pattern.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this return None instead of throwing an exception? The JSON parser does, though I'm not sure which approach is recommended. They should be consistent though.

Copy link
Contributor

@cloud-fan cloud-fan Feb 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think returning None is better, the upper-level(DataSource) will handle it and throw exception

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not related with this JIRA itself. I would like to avoid other additional changes.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both subclasses return Some(...) so it would be more clear to change this to def infer(...): StructType and do the option wrapping only in CSVFileFormat.inferSchema

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was copied from JsonDataSource.infer. I would like to keep this in case when we deal with #16976 (comment) and maybe introduce another parent or at least for consistency.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. Reducing code duplication between the JSON and CSV parsers would be great.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be tested on empty files, if it's not already.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, indeed it is an issue. #16976 (comment)

@SparkQA
Copy link

SparkQA commented Feb 23, 2017

Test build #73334 has finished for PR 16976 at commit 4dc48e1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just used if-else here because getOrElse things seems making this hard to read to me.

@HyukjinKwon
Copy link
Member Author

Thanks for your review @NathanHowell and @cloud-fan. I just addressed them and ran a build with Scala 2.10 per 8474933.

@SparkQA
Copy link

SparkQA commented Feb 23, 2017

Test build #73336 has finished for PR 16976 at commit e00bdee.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member Author

@HyukjinKwon HyukjinKwon Feb 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default quote is ". This behaviour is definately same with original CSV.

scala> Seq("\"1\"").toDF.write.text("/tmp/abc")

scala> spark.read.option("header", true).csv("/tmp/abc").show()
+---+
|  1|
+---+
+---+

scala> spark.read.csv("/tmp/abc").show()
+---+
|_c0|
+---+
|  1|
+---+

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the string" will just become string?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup.

"
string"

becomes \nstring. It requires quotes for newlines.

@SparkQA
Copy link

SparkQA commented Feb 23, 2017

Test build #73341 has finished for PR 16976 at commit 8474933.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 24, 2017

Test build #73417 has finished for PR 16976 at commit fbdfc61.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 24, 2017

Test build #73423 has finished for PR 16976 at commit 793a3eb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't tell the difference between this method and createInputStream(config: Configuration, file: Path).

We also need a better name for this method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me try to clean up this and other comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can write createInputStream(config, new Path(path)) directly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

val rdd = new BinaryFileRDD ...
rdd.setName(..).values

is more readable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, where is values defined?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(it seems defined here)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's expected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wholeFile instead of bool

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the string" will just become string?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we test the write path here? wholeFile is only about read path

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to test that we are able to read this back without wholeFile option if we remove all newlines.

// Check if the rows are the same if we remove all white spaces.
val readBack = spark.read.csv(csvDir)
checkAnswer(dfWithoutWhiteSpaces, readBack)

Let me clean up this one too.

@HyukjinKwon
Copy link
Member Author

@cloud-fan, I am running a build with 2.10 per b4e6983. I think it is ready for another look.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here also simply I added..

Seq(false, true).foreach { wholeFile =>
  ...
  .option("wholeFile", wholeFile)
  ...
}

@SparkQA
Copy link

SparkQA commented Feb 25, 2017

Test build #73454 has finished for PR 16976 at commit 321d082.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 25, 2017

Test build #73455 has finished for PR 16976 at commit b4e6983.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 25, 2017

Test build #73456 has finished for PR 16976 at commit 6187f6c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 25, 2017

Test build #73457 has finished for PR 16976 at commit b7d56c6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 26, 2017

Test build #73493 has finished for PR 16976 at commit 22eb29e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class CSVDataSource extends Serializable


test("Empty file produces empty dataframe with empty schema - wholeFile option") {
withTempPath { path =>
path.createNewFile()
Copy link
Contributor

@cloud-fan cloud-fan Feb 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually you want withTempDir

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 7e5359b Feb 28, 2017
* <li>`columnNameOfCorruptRecord` (default is the value specified in
* `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string
* created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li>
* <li>`wholeFile` (default `false`): parse one record, which may span multiple lines.</li>
Copy link
Member

@gatorsmile gatorsmile Jun 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multiple lines. -> multiple lines, per file.

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jun 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, they are different. JSON reads whole file as a record (basically it is. If it is an array then it will be individual record) whereas CSV reads each record when it meets multiple lines in a column.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still unable to get your point. Given an example?

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jun 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely. I wanted to emphasize multiple lines is not per file.

For example, CSV reads multiple records (multiple lines) per file (newline is replaced to \n manually for readability).

"I am
Hyukjin Kwon"
"Hyukjin Kwon
I love Spark!"
scala> spark.read.option("wholeFile", true).csv("test.csv").show()
+---------------------+
|                  _c0|
+---------------------+
|   I am\nHyukjin Kwon|
|Hyukjin Kwon\nI lo...|
+---------------------+

Whereas JSON reads the record per file. I am pretty sure object root support is primary.

{
  "I am": "HyukjinKwon",
  "HyukjinKwon": "I love Spark!"
}
scala> spark.read.option("wholeFile", true).json("test.json").show()
+-------------+-----------+
|  HyukjinKwon|       I am|
+-------------+-----------+
|I love Spark!|HyukjinKwon|
+-------------+-----------+

but note that it could (in terms of input/output), work similarly with CSV when the input is a json array.

[{
  "I am": "HyukjinKwon",
  "HyukjinKwon": "I love Spark!"
},{
  "I am": "HyukjinKwon",
  "HyukjinKwon": "I love Spark!"
}]
scala> spark.read.option("wholeFile", true).json("test.json").show()
+-------------+-----------+
|  HyukjinKwon|       I am|
+-------------+-----------+
|I love Spark!|HyukjinKwon|
|I love Spark!|HyukjinKwon|
+-------------+-----------+

Comparing array case and CSV, they work still differently. JSON, up to my knowledge, parses whole files and produces each record (in case of an array) whereas CSV parses record by record from the stream.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, the option wholeFile is misleading to end users for csv. We should rename it to multiLine or others.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, they are similar and different. I wouldn't mind opening a JIRA for this.

Copy link
Member

@gatorsmile gatorsmile Jun 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In JSON, when wholeFile is on, we only parse one and only one record (i.e., table rows) per file. The semantics are different. Let us fix the option name for CSV in 2.2.

``spark.sql.columnNameOfCorruptRecord``. If None is set,
it uses the value specified in
``spark.sql.columnNameOfCorruptRecord``.
:param wholeFile: parse one record, which may span multiple lines. If None is
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here.

``spark.sql.columnNameOfCorruptRecord``. If None is set,
it uses the value specified in
``spark.sql.columnNameOfCorruptRecord``.
:param wholeFile: parse records, which may span multiple lines. If None is
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here.

* <li>`columnNameOfCorruptRecord` (default is the value specified in
* `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string
* created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li>
* <li>`wholeFile` (default `false`): parse one record, which may span multiple lines.</li>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here.

primitiveFieldAndType.toDF("value").coalesce(1).write.text(path.getAbsolutePath)

val df = spark.read
.option("header", true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we have a set of files, how can we guarantee the reading order is what the users like? That means, the first file we read is the file that contains header?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, that might be an issue. I guess this could be a general issue for CSV datasource itself. It sounds worth checking how it works.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us check out and open an issue. I will try to help verify it and resolve at my best together.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non-whole-file parsing, we should skip the first line for each csv file (after we combine the partitioned files to a single file), right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, I got you. So, you mean what if other files still have the headers? It will skip. Univocity parser handles this (did I understood correctly?)

@raviolli
Copy link

raviolli commented Sep 28, 2017

Dont think it quite works.

spark.read.option("wholeFile", "true").option("header", "true").option("inferSchema", "true").csv("FILE.csv").show()
text is out of order

spark.read.option("wholeFile", "true").option("header", "true").option("inferSchema", "true").csv("data/FILE.csv").count()
311948

WHILE:
pandas says data count is 15860

@HyukjinKwon
Copy link
Member Author

Can you open a JIRA with more description, expected input, and expected output?

@smram
Copy link

smram commented Oct 4, 2017

don't think it quite works

+1 i could not get this to work either @raviolli

Does it need the univocity parser to be specified as suggested initially here https://github.com/databricks/spark-csv/issues/175 ?

@vishnusram
Copy link

vishnusram commented Nov 16, 2017

The wholeFile option doesn't seem to be working.
Test file(new_line.csv) content:
"num_col1","txt_col","num_col2"
10001,"regular string",20001
10002,"string with
newline",20002

Command and result:

dfu = sqlContext.read.format('com.databricks.spark.csv').option("header","true").option("inferschema","true").option("delimiter",",").option("quote",'"').option("parserLib","univocity").option("wholeFile","true").load('new_line.csv')

dfu.show(3,False)

+--------+--------------+--------+
|num_col1|txt_col |num_col2|
+--------+--------------+--------+
|10001 |regular string|20001 |
|10002 |string with |null |
|newline"|20002 |null |
+--------+--------------+--------+

Expected result:
+--------+--------------+--------+
|num_col1|txt_col |num_col2|
+--------+--------------+--------+
|10001 |regular string|20001 |
|10002 |string with\nnewline||20002 |
+--------+--------------+--------+

Spark version used 2.2
17/11/16 17:15:37 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
____ __
/ / ___ / /
\ / _ / _ `/ __/ '/
/
/ .
_/_,// //_\ version 2.2.0
/
/

Using Python version 2.7.5 (default, May 3 2017 07:55:04)
SparkSession available as 'spark'.

Please let me know if I am missing something here

@HyukjinKwon
Copy link
Member Author

It was renamed multiLine before the release. Could we try out it instead?

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Nov 16, 2017

Guys, please use the mailing list for a question and JIRA for filing issues ... it's not useful to raise arbitrary questions and issues for this option here.

BTW, please read the API documentation ahead and provide a self-contained reproducer if possible. For a question, please read this ahead SSCCE (http://sscce.org/).

@vishnusram
Copy link

Thanks Kwon. It worked

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants