-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-19610][SQL] Support parsing multiline CSV files #16976
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
(I should clean it up more and take a look into this deeper. I left open this for running tests.) |
3a303c7 to
8275d12
Compare
|
Test build #73062 has finished for PR 16976 at commit
|
|
Test build #73063 has finished for PR 16976 at commit
|
|
cc @NathanHowell, this basically resembles your JSON approach. Could you take a look and see if it makes sense please? |
|
cc @cloud-fan and @rxin too. |
|
Test build #73088 has finished for PR 16976 at commit
|
|
Test build #73089 has finished for PR 16976 at commit
|
32cffa7 to
2ddbf15
Compare
|
Test build #73105 has finished for PR 16976 at commit
|
|
hmm, I understand the motivation for this, though my understanding with csv generally either avoid having newline in field or some implementation would require quotes around field value with newline https://en.wikipedia.org/wiki/Comma-separated_values#General_functionality |
|
I initially left the similar comment in the same JIRAs. However, that there are a quite bit of similar JIRAs complaining about this and the original CSV datasource tried to support this although that was incorrectly implemented. This tries to match it with JSON one at least and it might be better to provide a way to process such CSV files. Actually, current implementation requires quotes :). (It was told R supports this case too actually). |
|
I see, fair enough. cc @falaki |
cloud-fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks pretty good, left some minor comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wow you are only 25?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was.. in the last year :).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this is exactly same with https://github.com/apache/spark/pull/16976/files#diff-56fbd53c6ada276cb4930affe3720be3L77
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, just copied and pasted. Just checked that they are same line by line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd prefer xxx.flatMap(line => parser.parse(line))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's safer to add a check
if (!hasNext) {
throw new NoSuchElementException("End of stream")
}
|
Per 4dc48e1, I ran a build with Scala 2.10. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the normal code path has a more complex way to drop the header, is it ok to simply drop the first parsed record here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, existing code path processes line by line so comments and empty lines had to manually skipped because it threw an error if each line has only commented line or empty. This is nicely handled in Univocity and the initial version of CSV actually did this in this way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just check the schema and result of df?
assert(df.schema.map(_.name) == Seq("string\"", "integer\n\n", ...))
checkAnswer(df, Row("this is a\nsimple\nstring", ...) :: Row ...)
writing df out and reading it back and comparing the result doesn't mean df has corrected result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be csv.flatMap { lines => ...?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have so far promoted the usage of mapPartitions and iterator for a better performance (not checked by myself though). At least, these are being used more commonly up to my knowledge and it makes it consistent. I would like to keep this way if you are fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it has better performance if you use mapPartitionsInternal, otherwise these 2 are exactly same...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Thank you. I didn't know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this work on 0 byte files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me check and add a test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems the original one is not working too. Can I maybe open another JIRA and PR both together (or a follow-up)?
scala> spark.read.csv("a")
java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.sIt seems we need a bit of fix including other CSV paths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another JIRA is a good idea for the line based parser, it could be fixed in this PR or seperately. WholeFileCSVDataSource should be fixed in this PR before merging, since it's new functionality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, let me try.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed in https://issues.apache.org/jira/browse/SPARK-19709 for line based one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be nice to move this over to CodecStreams (with an appropriate name) and fix up other classes that use this pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this return None instead of throwing an exception? The JSON parser does, though I'm not sure which approach is recommended. They should be consistent though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think returning None is better, the upper-level(DataSource) will handle it and throw exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not related with this JIRA itself. I would like to avoid other additional changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both subclasses return Some(...) so it would be more clear to change this to def infer(...): StructType and do the option wrapping only in CSVFileFormat.inferSchema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was copied from JsonDataSource.infer. I would like to keep this in case when we deal with #16976 (comment) and maybe introduce another parent or at least for consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me. Reducing code duplication between the JSON and CSV parsers would be great.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should also be tested on empty files, if it's not already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, indeed it is an issue. #16976 (comment)
|
Test build #73334 has finished for PR 16976 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just used if-else here because getOrElse things seems making this hard to read to me.
|
Thanks for your review @NathanHowell and @cloud-fan. I just addressed them and ran a build with Scala 2.10 per 8474933. |
|
Test build #73336 has finished for PR 16976 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default quote is ". This behaviour is definately same with original CSV.
scala> Seq("\"1\"").toDF.write.text("/tmp/abc")
scala> spark.read.option("header", true).csv("/tmp/abc").show()
+---+
| 1|
+---+
+---+
scala> spark.read.csv("/tmp/abc").show()
+---+
|_c0|
+---+
| 1|
+---+There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so the string" will just become string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup.
"
string"
becomes \nstring. It requires quotes for newlines.
|
Test build #73341 has finished for PR 16976 at commit
|
|
Test build #73417 has finished for PR 16976 at commit
|
|
Test build #73423 has finished for PR 16976 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't tell the difference between this method and createInputStream(config: Configuration, file: Path).
We also need a better name for this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, let me try to clean up this and other comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can write createInputStream(config, new Path(path)) directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
val rdd = new BinaryFileRDD ...
rdd.setName(..).values
is more readable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, where is values defined?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(it seems defined here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: wholeFile instead of bool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so the string" will just become string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we test the write path here? wholeFile is only about read path
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant to test that we are able to read this back without wholeFile option if we remove all newlines.
// Check if the rows are the same if we remove all white spaces.
val readBack = spark.read.csv(csvDir)
checkAnswer(dfWithoutWhiteSpaces, readBack)Let me clean up this one too.
|
@cloud-fan, I am running a build with 2.10 per b4e6983. I think it is ready for another look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here also simply I added..
Seq(false, true).foreach { wholeFile =>
...
.option("wholeFile", wholeFile)
...
}|
Test build #73454 has finished for PR 16976 at commit
|
|
Test build #73455 has finished for PR 16976 at commit
|
|
Test build #73456 has finished for PR 16976 at commit
|
|
Test build #73457 has finished for PR 16976 at commit
|
b7d56c6 to
1492d00
Compare
1492d00 to
22eb29e
Compare
|
Test build #73493 has finished for PR 16976 at commit
|
|
|
||
| test("Empty file produces empty dataframe with empty schema - wholeFile option") { | ||
| withTempPath { path => | ||
| path.createNewFile() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually you want withTempDir
|
thanks, merging to master! |
| * <li>`columnNameOfCorruptRecord` (default is the value specified in | ||
| * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string | ||
| * created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li> | ||
| * <li>`wholeFile` (default `false`): parse one record, which may span multiple lines.</li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
multiple lines. -> multiple lines, per file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, they are different. JSON reads whole file as a record (basically it is. If it is an array then it will be individual record) whereas CSV reads each record when it meets multiple lines in a column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still unable to get your point. Given an example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely. I wanted to emphasize multiple lines is not per file.
For example, CSV reads multiple records (multiple lines) per file (newline is replaced to \n manually for readability).
"I am
Hyukjin Kwon"
"Hyukjin Kwon
I love Spark!"
scala> spark.read.option("wholeFile", true).csv("test.csv").show()
+---------------------+
| _c0|
+---------------------+
| I am\nHyukjin Kwon|
|Hyukjin Kwon\nI lo...|
+---------------------+Whereas JSON reads the record per file. I am pretty sure object root support is primary.
{
"I am": "HyukjinKwon",
"HyukjinKwon": "I love Spark!"
}scala> spark.read.option("wholeFile", true).json("test.json").show()
+-------------+-----------+
| HyukjinKwon| I am|
+-------------+-----------+
|I love Spark!|HyukjinKwon|
+-------------+-----------+but note that it could (in terms of input/output), work similarly with CSV when the input is a json array.
[{
"I am": "HyukjinKwon",
"HyukjinKwon": "I love Spark!"
},{
"I am": "HyukjinKwon",
"HyukjinKwon": "I love Spark!"
}]scala> spark.read.option("wholeFile", true).json("test.json").show()
+-------------+-----------+
| HyukjinKwon| I am|
+-------------+-----------+
|I love Spark!|HyukjinKwon|
|I love Spark!|HyukjinKwon|
+-------------+-----------+Comparing array case and CSV, they work still differently. JSON, up to my knowledge, parses whole files and produces each record (in case of an array) whereas CSV parses record by record from the stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, the option wholeFile is misleading to end users for csv. We should rename it to multiLine or others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, they are similar and different. I wouldn't mind opening a JIRA for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In JSON, when wholeFile is on, we only parse one and only one record (i.e., table rows) per file. The semantics are different. Let us fix the option name for CSV in 2.2.
| ``spark.sql.columnNameOfCorruptRecord``. If None is set, | ||
| it uses the value specified in | ||
| ``spark.sql.columnNameOfCorruptRecord``. | ||
| :param wholeFile: parse one record, which may span multiple lines. If None is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same here.
| ``spark.sql.columnNameOfCorruptRecord``. If None is set, | ||
| it uses the value specified in | ||
| ``spark.sql.columnNameOfCorruptRecord``. | ||
| :param wholeFile: parse records, which may span multiple lines. If None is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same here.
| * <li>`columnNameOfCorruptRecord` (default is the value specified in | ||
| * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string | ||
| * created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li> | ||
| * <li>`wholeFile` (default `false`): parse one record, which may span multiple lines.</li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same here.
| primitiveFieldAndType.toDF("value").coalesce(1).write.text(path.getAbsolutePath) | ||
|
|
||
| val df = spark.read | ||
| .option("header", true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we have a set of files, how can we guarantee the reading order is what the users like? That means, the first file we read is the file that contains header?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, that might be an issue. I guess this could be a general issue for CSV datasource itself. It sounds worth checking how it works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us check out and open an issue. I will try to help verify it and resolve at my best together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For non-whole-file parsing, we should skip the first line for each csv file (after we combine the partitioned files to a single file), right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, sorry, I got you. So, you mean what if other files still have the headers? It will skip. Univocity parser handles this (did I understood correctly?)
|
Dont think it quite works. spark.read.option("wholeFile", "true").option("header", "true").option("inferSchema", "true").csv("FILE.csv").show() spark.read.option("wholeFile", "true").option("header", "true").option("inferSchema", "true").csv("data/FILE.csv").count() WHILE: |
|
Can you open a JIRA with more description, expected input, and expected output? |
+1 i could not get this to work either @raviolli Does it need the univocity parser to be specified as suggested initially here https://github.com/databricks/spark-csv/issues/175 ? |
|
The wholeFile option doesn't seem to be working. Command and result:
+--------+--------------+--------+ Expected result: Spark version used 2.2 Using Python version 2.7.5 (default, May 3 2017 07:55:04) Please let me know if I am missing something here |
|
It was renamed |
|
Guys, please use the mailing list for a question and JIRA for filing issues ... it's not useful to raise arbitrary questions and issues for this option here. BTW, please read the API documentation ahead and provide a self-contained reproducer if possible. For a question, please read this ahead SSCCE (http://sscce.org/). |
|
Thanks Kwon. It worked |
What changes were proposed in this pull request?
This PR proposes the support for multiple lines for CSV by resembling the multiline supports in JSON datasource (in case of JSON, per file).
So, this PR introduces
wholeFileoption which makes the format not splittable and reads each whole file. Since Univocity parser can produces each row from a stream, it should be capable of parsing very large documents when the internal rows are fix in the memory.How was this patch tested?
Unit tests in
CSVSuiteandtests.pyManual tests with a single 9GB CSV file in local file system, for example,