diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 1ad974e9aa4c7..4e58bfb843644 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -304,7 +304,7 @@ def parquet(self, *paths):
@ignore_unicode_prefix
@since(1.6)
- def text(self, paths):
+ def text(self, paths, wholetext=False):
"""
Loads text files and returns a :class:`DataFrame` whose schema starts with a
string column named "value", and followed by partitioned columns if there
@@ -313,11 +313,16 @@ def text(self, paths):
Each line in the text file is a new row in the resulting DataFrame.
:param paths: string, or list of strings, for input path(s).
+ :param wholetext: if true, read each file from input path(s) as a single row.
>>> df = spark.read.text('python/test_support/sql/text-test.txt')
>>> df.collect()
[Row(value=u'hello'), Row(value=u'this')]
+ >>> df = spark.read.text('python/test_support/sql/text-test.txt', wholetext=True)
+ >>> df.collect()
+ [Row(value=u'hello\\nthis')]
"""
+ self._set_opts(wholetext=wholetext)
if isinstance(paths, basestring):
paths = [paths]
return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths)))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index ea1cf66775235..39fec8f983b65 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -646,7 +646,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* Loads text files and returns a `DataFrame` whose schema starts with a string column named
* "value", and followed by partitioned columns if there are any.
*
- * Each line in the text files is a new row in the resulting DataFrame. For example:
+ * You can set the following text-specific option(s) for reading text files:
+ *
+ * - `wholetext` ( default `false`): If true, read a file as a single row and not split by "\n".
+ *
+ *
+ * By default, each line in the text files is a new row in the resulting DataFrame.
+ *
+ * Usage example:
* {{{
* // Scala:
* spark.read.text("/path/to/spark/README.md")
@@ -678,7 +685,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* If the directory structure of the text files contains partitioning information, those are
* ignored in the resulting Dataset. To include partitioning information as columns, use `text`.
*
- * Each line in the text files is a new element in the resulting Dataset. For example:
+ * You can set the following textFile-specific option(s) for reading text files:
+ *
+ * - `wholetext` ( default `false`): If true, read a file as a single row and not split by "\n".
+ *
+ *
+ * By default, each line in the text files is a new row in the resulting DataFrame. For example:
* {{{
* // Scala:
* spark.read.textFile("/path/to/spark/README.md")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala
new file mode 100644
index 0000000000000..c61a89e6e8c3f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.Closeable
+import java.net.URI
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+import org.apache.spark.input.WholeTextFileRecordReader
+
+/**
+ * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which is all of the lines
+ * in that file.
+ */
+class HadoopFileWholeTextReader(file: PartitionedFile, conf: Configuration)
+ extends Iterator[Text] with Closeable {
+ private val iterator = {
+ val fileSplit = new CombineFileSplit(
+ Array(new Path(new URI(file.filePath))),
+ Array(file.start),
+ Array(file.length),
+ // TODO: Implement Locality
+ Array.empty[String])
+ val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+ val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
+ val reader = new WholeTextFileRecordReader(fileSplit, hadoopAttemptContext, 0)
+ reader.initialize(fileSplit, hadoopAttemptContext)
+ new RecordReaderIterator(reader)
+ }
+
+ override def hasNext: Boolean = iterator.hasNext
+
+ override def next(): Text = iterator.next()
+
+ override def close(): Unit = iterator.close()
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index d0690445d7672..8a6ab303fc0f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -17,12 +17,16 @@
package org.apache.spark.sql.execution.datasources.text
+import java.io.Closeable
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.TaskContext
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
@@ -53,6 +57,14 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
}
}
+ override def isSplitable(
+ sparkSession: SparkSession,
+ options: Map[String, String],
+ path: Path): Boolean = {
+ val textOptions = new TextOptions(options)
+ super.isSplitable(sparkSession, options, path) && !textOptions.wholeText
+ }
+
override def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
@@ -97,14 +109,25 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
assert(
requiredSchema.length <= 1,
"Text data source only produces a single data column named \"value\".")
-
+ val textOptions = new TextOptions(options)
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+ readToUnsafeMem(broadcastedHadoopConf, requiredSchema, textOptions.wholeText)
+ }
+
+ private def readToUnsafeMem(conf: Broadcast[SerializableConfiguration],
+ requiredSchema: StructType, wholeTextMode: Boolean):
+ (PartitionedFile) => Iterator[UnsafeRow] = {
+
(file: PartitionedFile) => {
- val reader = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value)
+ val confValue = conf.value.value
+ val reader = if (!wholeTextMode) {
+ new HadoopFileLinesReader(file, confValue)
+ } else {
+ new HadoopFileWholeTextReader(file, confValue)
+ }
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => reader.close()))
-
if (requiredSchema.isEmpty) {
val emptyUnsafeRow = new UnsafeRow(0)
reader.map(_ => emptyUnsafeRow)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
index 49bd7382f9cf3..2a661561ab51e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
@@ -33,8 +33,15 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti
* Compression codec to use.
*/
val compressionCodec = parameters.get(COMPRESSION).map(CompressionCodecs.getCodecClassName)
+
+ /**
+ * wholetext - If true, read a file as a single row and not split by "\n".
+ */
+ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean
+
}
private[text] object TextOptions {
val COMPRESSION = "compression"
+ val WHOLETEXT = "wholetext"
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index cb7393cdd2b9d..33287044f279e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -185,10 +185,9 @@ class TextSuite extends QueryTest with SharedSQLContext {
val data = df.collect()
assert(data(0) == Row("This is a test file for the text data source"))
assert(data(1) == Row("1+1"))
- // non ascii characters are not allowed in the code, so we disable the scalastyle here.
- // scalastyle:off
+ // scalastyle:off nonascii
assert(data(2) == Row("数据砖头"))
- // scalastyle:on
+ // scalastyle:on nonascii
assert(data(3) == Row("\"doh\""))
assert(data.length == 4)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala
new file mode 100644
index 0000000000000..8bd736bee69de
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.text
+
+import java.io.File
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{StringType, StructType}
+
+class WholeTextFileSuite extends QueryTest with SharedSQLContext {
+
+ // Hadoop's FileSystem caching does not use the Configuration as part of its cache key, which
+ // can cause Filesystem.get(Configuration) to return a cached instance created with a different
+ // configuration than the one passed to get() (see HADOOP-8490 for more details). This caused
+ // hard-to-reproduce test failures, since any suites that were run after this one would inherit
+ // the new value of "fs.local.block.size" (see SPARK-5227 and SPARK-5679). To work around this,
+ // we disable FileSystem caching in this suite.
+ protected override def sparkConf =
+ super.sparkConf.set("spark.hadoop.fs.file.impl.disable.cache", "true")
+
+ private def testFile: String = {
+ Thread.currentThread().getContextClassLoader.getResource("test-data/text-suite.txt").toString
+ }
+
+ test("reading text file with option wholetext=true") {
+ val df = spark.read.option("wholetext", "true")
+ .format("text").load(testFile)
+ // schema
+ assert(df.schema == new StructType().add("value", StringType))
+
+ // verify content
+ val data = df.collect()
+ assert(data(0) ==
+ Row(
+ // scalastyle:off nonascii
+ """This is a test file for the text data source
+ |1+1
+ |数据砖头
+ |"doh"
+ |""".stripMargin))
+ // scalastyle:on nonascii
+ assert(data.length == 1)
+ }
+
+ test("correctness of wholetext option") {
+ import org.apache.spark.sql.catalyst.util._
+ withTempDir { dir =>
+ val file1 = new File(dir, "text1.txt")
+ stringToFile(file1,
+ """text file 1 contents.
+ |From: None to: ??
+ """.stripMargin)
+ val file2 = new File(dir, "text2.txt")
+ stringToFile(file2, "text file 2 contents.")
+ val file3 = new File(dir, "text3.txt")
+ stringToFile(file3, "text file 3 contents.")
+ val df = spark.read.option("wholetext", "true").text(dir.getAbsolutePath)
+ // Since wholetext option reads each file into a single row, df.length should be no. of files.
+ val data = df.sort("value").collect()
+ assert(data.length == 3)
+ // Each files should represent a single Row/element in Dataframe/Dataset
+ assert(data(0) == Row(
+ """text file 1 contents.
+ |From: None to: ??
+ """.stripMargin))
+ assert(data(1) == Row(
+ """text file 2 contents.""".stripMargin))
+ assert(data(2) == Row(
+ """text file 3 contents.""".stripMargin))
+ }
+ }
+
+
+ test("Correctness of wholetext option with gzip compression mode.") {
+ withTempDir { dir =>
+ val path = dir.getCanonicalPath
+ val df1 = spark.range(0, 1000).selectExpr("CAST(id AS STRING) AS s").repartition(1)
+ df1.write.option("compression", "gzip").mode("overwrite").text(path)
+ // On reading through wholetext mode, one file will be read as a single row, i.e. not
+ // delimited by "next line" character.
+ val expected = Row(Range(0, 1000).mkString("", "\n", "\n"))
+ Seq(10, 100, 1000).foreach { bytes =>
+ withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> bytes.toString) {
+ val df2 = spark.read.option("wholetext", "true").format("text").load(path)
+ val result = df2.collect().head
+ assert(result === expected)
+ }
+ }
+ }
+ }
+}