From dd2ed3dce09dfd4c68dffe6fab831469273f7d5e Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Thu, 7 Dec 2017 18:37:21 +0530 Subject: [PATCH 1/4] [SPARK-16496][SQL] Add wholetext as option for reading text in SQL. --- python/pyspark/sql/readwriter.py | 7 ++- .../apache/spark/sql/DataFrameReader.scala | 16 +++++- .../HadoopFileWholeTextReader.scala | 57 +++++++++++++++++++ .../datasources/text/TextFileFormat.scala | 31 ++++++++-- .../datasources/text/TextOptions.scala | 7 +++ .../datasources/text/TextSuite.scala | 53 ++++++++++++++++- 6 files changed, 161 insertions(+), 10 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 1ad974e9aa4c7..49c0e25e85873 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -304,7 +304,7 @@ def parquet(self, *paths): @ignore_unicode_prefix @since(1.6) - def text(self, paths): + def text(self, paths, wholetext=False): """ Loads text files and returns a :class:`DataFrame` whose schema starts with a string column named "value", and followed by partitioned columns if there @@ -313,11 +313,16 @@ def text(self, paths): Each line in the text file is a new row in the resulting DataFrame. :param paths: string, or list of strings, for input path(s). + :param wholetext: if true, read each file from input path(s) as a single row. >>> df = spark.read.text('python/test_support/sql/text-test.txt') >>> df.collect() [Row(value=u'hello'), Row(value=u'this')] + >>> df = spark.read.text('python/test_support/sql/text-test.txt', wholetext=True) + >>> df.collect() + [Row(value=u'hello\nthis')] """ + self._set_opts(wholetext=wholetext) if isinstance(paths, basestring): paths = [paths] return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index ea1cf66775235..39fec8f983b65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -646,7 +646,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * Loads text files and returns a `DataFrame` whose schema starts with a string column named * "value", and followed by partitioned columns if there are any. * - * Each line in the text files is a new row in the resulting DataFrame. For example: + * You can set the following text-specific option(s) for reading text files: + * + * By default, each line in the text files is a new row in the resulting DataFrame. + * + * Usage example: * {{{ * // Scala: * spark.read.text("/path/to/spark/README.md") @@ -678,7 +685,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * If the directory structure of the text files contains partitioning information, those are * ignored in the resulting Dataset. To include partitioning information as columns, use `text`. * - * Each line in the text files is a new element in the resulting Dataset. For example: + * You can set the following textFile-specific option(s) for reading text files: + * + * By default, each line in the text files is a new row in the resulting DataFrame. For example: * {{{ * // Scala: * spark.read.textFile("/path/to/spark/README.md") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala new file mode 100644 index 0000000000000..c61a89e6e8c3f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.Closeable +import java.net.URI + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.Text +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl + +import org.apache.spark.input.WholeTextFileRecordReader + +/** + * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which is all of the lines + * in that file. + */ +class HadoopFileWholeTextReader(file: PartitionedFile, conf: Configuration) + extends Iterator[Text] with Closeable { + private val iterator = { + val fileSplit = new CombineFileSplit( + Array(new Path(new URI(file.filePath))), + Array(file.start), + Array(file.length), + // TODO: Implement Locality + Array.empty[String]) + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + val reader = new WholeTextFileRecordReader(fileSplit, hadoopAttemptContext, 0) + reader.initialize(fileSplit, hadoopAttemptContext) + new RecordReaderIterator(reader) + } + + override def hasNext: Boolean = iterator.hasNext + + override def next(): Text = iterator.next() + + override def close(): Unit = iterator.close() +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index d0690445d7672..8a6ab303fc0f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -17,12 +17,16 @@ package org.apache.spark.sql.execution.datasources.text +import java.io.Closeable + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.TaskContext -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} @@ -53,6 +57,14 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { } } + override def isSplitable( + sparkSession: SparkSession, + options: Map[String, String], + path: Path): Boolean = { + val textOptions = new TextOptions(options) + super.isSplitable(sparkSession, options, path) && !textOptions.wholeText + } + override def inferSchema( sparkSession: SparkSession, options: Map[String, String], @@ -97,14 +109,25 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { assert( requiredSchema.length <= 1, "Text data source only produces a single data column named \"value\".") - + val textOptions = new TextOptions(options) val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + readToUnsafeMem(broadcastedHadoopConf, requiredSchema, textOptions.wholeText) + } + + private def readToUnsafeMem(conf: Broadcast[SerializableConfiguration], + requiredSchema: StructType, wholeTextMode: Boolean): + (PartitionedFile) => Iterator[UnsafeRow] = { + (file: PartitionedFile) => { - val reader = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value) + val confValue = conf.value.value + val reader = if (!wholeTextMode) { + new HadoopFileLinesReader(file, confValue) + } else { + new HadoopFileWholeTextReader(file, confValue) + } Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => reader.close())) - if (requiredSchema.isEmpty) { val emptyUnsafeRow = new UnsafeRow(0) reader.map(_ => emptyUnsafeRow) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala index 49bd7382f9cf3..2a661561ab51e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala @@ -33,8 +33,15 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti * Compression codec to use. */ val compressionCodec = parameters.get(COMPRESSION).map(CompressionCodecs.getCodecClassName) + + /** + * wholetext - If true, read a file as a single row and not split by "\n". + */ + val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean + } private[text] object TextOptions { val COMPRESSION = "compression" + val WHOLETEXT = "wholetext" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index cb7393cdd2b9d..59a8878b1a568 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -39,6 +39,54 @@ class TextSuite extends QueryTest with SharedSQLContext { verifyFrame(spark.read.text(testFile)) } + test("reading text file with option wholetext=true") { + val df = spark.read.option("wholetext", "true") + .format("text").load(testFile) + // schema + assert(df.schema == new StructType().add("value", StringType)) + + // verify content + val data = df.collect() + assert(data(0) == + Row( + // scalastyle:off nonascii + """This is a test file for the text data source + |1+1 + |数据砖头 + |"doh" + |""".stripMargin)) + // scalastyle:on nonascii + assert(data.length == 1) + } + + test("reading multiple text files with option wholetext=true") { + import org.apache.spark.sql.catalyst.util._ + withTempDir { dir => + val file1 = new File(dir, "text1.txt") + stringToFile(file1, + """text file 1 contents. + |From: None to: ?? + """.stripMargin) + val file2 = new File(dir, "text2.txt") + stringToFile(file2, "text file 2 contents.") + val file3 = new File(dir, "text3.txt") + stringToFile(file3, "text file 3 contents.") + val df = spark.read.option("wholetext", "true").text(dir.getAbsolutePath) + // Since wholetext option reads each file into a single row, df.length should be no. of files. + val data = df.sort("value").collect() + assert(data.length == 3) + // Each files should represent a single Row/element in Dataframe/Dataset + assert(data(0) == Row( + """text file 1 contents. + |From: None to: ?? + """.stripMargin)) + assert(data(1) == Row( + """text file 2 contents.""".stripMargin)) + assert(data(2) == Row( + """text file 3 contents.""".stripMargin)) + } + } + test("SPARK-12562 verify write.text() can handle column name beyond `value`") { val df = spark.read.text(testFile).withColumnRenamed("value", "adwrasdf") @@ -185,10 +233,9 @@ class TextSuite extends QueryTest with SharedSQLContext { val data = df.collect() assert(data(0) == Row("This is a test file for the text data source")) assert(data(1) == Row("1+1")) - // non ascii characters are not allowed in the code, so we disable the scalastyle here. - // scalastyle:off + // scalastyle:off nonascii assert(data(2) == Row("数据砖头")) - // scalastyle:on + // scalastyle:on nonascii assert(data(3) == Row("\"doh\"")) assert(data.length == 4) } From 7e9102040ee9d6817288b5712e4c3c353390fba1 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 8 Dec 2017 11:05:27 +0530 Subject: [PATCH 2/4] Try out escaping slash. --- python/pyspark/sql/readwriter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 49c0e25e85873..4e58bfb843644 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -320,7 +320,7 @@ def text(self, paths, wholetext=False): [Row(value=u'hello'), Row(value=u'this')] >>> df = spark.read.text('python/test_support/sql/text-test.txt', wholetext=True) >>> df.collect() - [Row(value=u'hello\nthis')] + [Row(value=u'hello\\nthis')] """ self._set_opts(wholetext=wholetext) if isinstance(paths, basestring): From 66d5b453cd2aaaea08a3843f4966fc9036451b6c Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Mon, 11 Dec 2017 15:12:14 +0530 Subject: [PATCH 3/4] Added a WholeTextFileSuite, covering more cases from the corresponding RDD version of the option. --- .../datasources/text/TextSuite.scala | 48 -------- .../datasources/text/WholeTextFileSuite.scala | 108 ++++++++++++++++++ 2 files changed, 108 insertions(+), 48 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 59a8878b1a568..33287044f279e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -39,54 +39,6 @@ class TextSuite extends QueryTest with SharedSQLContext { verifyFrame(spark.read.text(testFile)) } - test("reading text file with option wholetext=true") { - val df = spark.read.option("wholetext", "true") - .format("text").load(testFile) - // schema - assert(df.schema == new StructType().add("value", StringType)) - - // verify content - val data = df.collect() - assert(data(0) == - Row( - // scalastyle:off nonascii - """This is a test file for the text data source - |1+1 - |数据砖头 - |"doh" - |""".stripMargin)) - // scalastyle:on nonascii - assert(data.length == 1) - } - - test("reading multiple text files with option wholetext=true") { - import org.apache.spark.sql.catalyst.util._ - withTempDir { dir => - val file1 = new File(dir, "text1.txt") - stringToFile(file1, - """text file 1 contents. - |From: None to: ?? - """.stripMargin) - val file2 = new File(dir, "text2.txt") - stringToFile(file2, "text file 2 contents.") - val file3 = new File(dir, "text3.txt") - stringToFile(file3, "text file 3 contents.") - val df = spark.read.option("wholetext", "true").text(dir.getAbsolutePath) - // Since wholetext option reads each file into a single row, df.length should be no. of files. - val data = df.sort("value").collect() - assert(data.length == 3) - // Each files should represent a single Row/element in Dataframe/Dataset - assert(data(0) == Row( - """text file 1 contents. - |From: None to: ?? - """.stripMargin)) - assert(data(1) == Row( - """text file 2 contents.""".stripMargin)) - assert(data(2) == Row( - """text file 3 contents.""".stripMargin)) - } - } - test("SPARK-12562 verify write.text() can handle column name beyond `value`") { val df = spark.read.text(testFile).withColumnRenamed("value", "adwrasdf") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala new file mode 100644 index 0000000000000..ef26f965f53c8 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.text + +import java.io.File + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{StringType, StructType} + +class WholeTextFileSuite extends QueryTest with SharedSQLContext { + + // Hadoop's FileSystem caching does not use the Configuration as part of its cache key, which + // can cause Filesystem.get(Configuration) to return a cached instance created with a different + // configuration than the one passed to get() (see HADOOP-8490 for more details). This caused + // hard-to-reproduce test failures, since any suites that were run after this one would inherit + // the new value of "fs.local.block.size" (see SPARK-5227 and SPARK-5679). To work around this, + // we disable FileSystem caching in this suite. + protected override def sparkConf = + super.sparkConf.set("spark.hadoop.fs.file.impl.disable.cache", "true") + + private def testFile: String = { + Thread.currentThread().getContextClassLoader.getResource("test-data/text-suite.txt").toString + } + + test("reading text file with option wholetext=true") { + val df = spark.read.option("wholetext", "true") + .format("text").load(testFile) + // schema + assert(df.schema == new StructType().add("value", StringType)) + + // verify content + val data = df.collect() + assert(data(0) == + Row( + // scalastyle:off nonascii + """This is a test file for the text data source + |1+1 + |数据砖头 + |"doh" + |""".stripMargin)) + // scalastyle:on nonascii + assert(data.length == 1) + } + + test("correctness of wholetext option") { + import org.apache.spark.sql.catalyst.util._ + withTempDir { dir => + val file1 = new File(dir, "text1.txt") + stringToFile(file1, + """text file 1 contents. + |From: None to: ?? + """.stripMargin) + val file2 = new File(dir, "text2.txt") + stringToFile(file2, "text file 2 contents.") + val file3 = new File(dir, "text3.txt") + stringToFile(file3, "text file 3 contents.") + val df = spark.read.option("wholetext", "true").text(dir.getAbsolutePath) + // Since wholetext option reads each file into a single row, df.length should be no. of files. + val data = df.sort("value").collect() + assert(data.length == 3) + // Each files should represent a single Row/element in Dataframe/Dataset + assert(data(0) == Row( + """text file 1 contents. + |From: None to: ?? + """.stripMargin)) + assert(data(1) == Row( + """text file 2 contents.""".stripMargin)) + assert(data(2) == Row( + """text file 3 contents.""".stripMargin)) + } + } + + + test("Correctness of wholetext option with gzip compression mode.") { + withTempDir { dir => + val path = dir.getCanonicalPath + val df1 = spark.range(0, 1000).selectExpr("CAST(id AS STRING) AS s") + df1.write.option("compression", "gzip").mode("overwrite").text(path) + + val expected = df1.collect() + Seq(10, 100, 1000).foreach { bytes => + withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> bytes.toString) { + val df2 = spark.read.option("wholetext", "true").format("text").load(path) + checkAnswer(df2, expected) + } + } + } + } + + +} From 021039bd1382392282faaec1e1f5c0d39e650a93 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Thu, 14 Dec 2017 12:03:07 +0530 Subject: [PATCH 4/4] fixed tests --- .../datasources/text/WholeTextFileSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala index ef26f965f53c8..8bd736bee69de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala @@ -91,18 +91,18 @@ class WholeTextFileSuite extends QueryTest with SharedSQLContext { test("Correctness of wholetext option with gzip compression mode.") { withTempDir { dir => val path = dir.getCanonicalPath - val df1 = spark.range(0, 1000).selectExpr("CAST(id AS STRING) AS s") + val df1 = spark.range(0, 1000).selectExpr("CAST(id AS STRING) AS s").repartition(1) df1.write.option("compression", "gzip").mode("overwrite").text(path) - - val expected = df1.collect() + // On reading through wholetext mode, one file will be read as a single row, i.e. not + // delimited by "next line" character. + val expected = Row(Range(0, 1000).mkString("", "\n", "\n")) Seq(10, 100, 1000).foreach { bytes => withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> bytes.toString) { val df2 = spark.read.option("wholetext", "true").format("text").load(path) - checkAnswer(df2, expected) + val result = df2.collect().head + assert(result === expected) } } } } - - }