Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ def parquet(self, *paths):

@ignore_unicode_prefix
@since(1.6)
def text(self, paths):
def text(self, paths, wholetext=False):
"""
Loads text files and returns a :class:`DataFrame` whose schema starts with a
string column named "value", and followed by partitioned columns if there
Expand All @@ -313,11 +313,16 @@ def text(self, paths):
Each line in the text file is a new row in the resulting DataFrame.

:param paths: string, or list of strings, for input path(s).
:param wholetext: if true, read each file from input path(s) as a single row.

>>> df = spark.read.text('python/test_support/sql/text-test.txt')
>>> df.collect()
[Row(value=u'hello'), Row(value=u'this')]
>>> df = spark.read.text('python/test_support/sql/text-test.txt', wholetext=True)
>>> df.collect()
[Row(value=u'hello\\nthis')]
"""
self._set_opts(wholetext=wholetext)
if isinstance(paths, basestring):
paths = [paths]
return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths)))
Expand Down
16 changes: 14 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* Loads text files and returns a `DataFrame` whose schema starts with a string column named
* "value", and followed by partitioned columns if there are any.
*
* Each line in the text files is a new row in the resulting DataFrame. For example:
* You can set the following text-specific option(s) for reading text files:
* <ul>
* <li>`wholetext` ( default `false`): If true, read a file as a single row and not split by "\n".
* </li>
* </ul>
* By default, each line in the text files is a new row in the resulting DataFrame.
*
* Usage example:
* {{{
* // Scala:
* spark.read.text("/path/to/spark/README.md")
Expand Down Expand Up @@ -678,7 +685,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* If the directory structure of the text files contains partitioning information, those are
* ignored in the resulting Dataset. To include partitioning information as columns, use `text`.
*
* Each line in the text files is a new element in the resulting Dataset. For example:
* You can set the following textFile-specific option(s) for reading text files:
* <ul>
* <li>`wholetext` ( default `false`): If true, read a file as a single row and not split by "\n".
* </li>
* </ul>
* By default, each line in the text files is a new row in the resulting DataFrame. For example:
* {{{
* // Scala:
* spark.read.textFile("/path/to/spark/README.md")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import java.io.Closeable
import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import org.apache.spark.input.WholeTextFileRecordReader

/**
* An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which is all of the lines
* in that file.
*/
class HadoopFileWholeTextReader(file: PartitionedFile, conf: Configuration)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to remind that a PartitionedFile can be just a part of a input file, instead of a whole file. So you cannot guarantee that in this case the reader reads all content of a file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to override isSplitable of TextFileFormat and return false when wholetext option is enabled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, for catching this.

extends Iterator[Text] with Closeable {
private val iterator = {
val fileSplit = new CombineFileSplit(
Array(new Path(new URI(file.filePath))),
Array(file.start),
Array(file.length),
// TODO: Implement Locality
Array.empty[String])
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
val reader = new WholeTextFileRecordReader(fileSplit, hadoopAttemptContext, 0)
reader.initialize(fileSplit, hadoopAttemptContext)
new RecordReaderIterator(reader)
}

override def hasNext: Boolean = iterator.hasNext

override def next(): Text = iterator.next()

override def close(): Unit = iterator.close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

package org.apache.spark.sql.execution.datasources.text

import java.io.Closeable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}

import org.apache.spark.TaskContext
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
Expand Down Expand Up @@ -53,6 +57,14 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
}
}

override def isSplitable(
sparkSession: SparkSession,
options: Map[String, String],
path: Path): Boolean = {
val textOptions = new TextOptions(options)
super.isSplitable(sparkSession, options, path) && !textOptions.wholeText
}

override def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
Expand Down Expand Up @@ -97,14 +109,25 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
assert(
requiredSchema.length <= 1,
"Text data source only produces a single data column named \"value\".")

val textOptions = new TextOptions(options)
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

readToUnsafeMem(broadcastedHadoopConf, requiredSchema, textOptions.wholeText)
}

private def readToUnsafeMem(conf: Broadcast[SerializableConfiguration],
requiredSchema: StructType, wholeTextMode: Boolean):
(PartitionedFile) => Iterator[UnsafeRow] = {

(file: PartitionedFile) => {
val reader = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value)
val confValue = conf.value.value
val reader = if (!wholeTextMode) {
new HadoopFileLinesReader(file, confValue)
} else {
new HadoopFileWholeTextReader(file, confValue)
}
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => reader.close()))

if (requiredSchema.isEmpty) {
val emptyUnsafeRow = new UnsafeRow(0)
reader.map(_ => emptyUnsafeRow)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,15 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti
* Compression codec to use.
*/
val compressionCodec = parameters.get(COMPRESSION).map(CompressionCodecs.getCodecClassName)

/**
* wholetext - If true, read a file as a single row and not split by "\n".
*/
val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean

}

private[text] object TextOptions {
val COMPRESSION = "compression"
val WHOLETEXT = "wholetext"
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,9 @@ class TextSuite extends QueryTest with SharedSQLContext {
val data = df.collect()
assert(data(0) == Row("This is a test file for the text data source"))
assert(data(1) == Row("1+1"))
// non ascii characters are not allowed in the code, so we disable the scalastyle here.
// scalastyle:off
// scalastyle:off nonascii
assert(data(2) == Row("数据砖头"))
// scalastyle:on
// scalastyle:on nonascii
assert(data(3) == Row("\"doh\""))
assert(data.length == 4)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.text

import java.io.File

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{StringType, StructType}

class WholeTextFileSuite extends QueryTest with SharedSQLContext {

// Hadoop's FileSystem caching does not use the Configuration as part of its cache key, which
// can cause Filesystem.get(Configuration) to return a cached instance created with a different
// configuration than the one passed to get() (see HADOOP-8490 for more details). This caused
// hard-to-reproduce test failures, since any suites that were run after this one would inherit
// the new value of "fs.local.block.size" (see SPARK-5227 and SPARK-5679). To work around this,
// we disable FileSystem caching in this suite.
protected override def sparkConf =
super.sparkConf.set("spark.hadoop.fs.file.impl.disable.cache", "true")

private def testFile: String = {
Thread.currentThread().getContextClassLoader.getResource("test-data/text-suite.txt").toString
}

test("reading text file with option wholetext=true") {
val df = spark.read.option("wholetext", "true")
.format("text").load(testFile)
// schema
assert(df.schema == new StructType().add("value", StringType))

// verify content
val data = df.collect()
assert(data(0) ==
Row(
// scalastyle:off nonascii
"""This is a test file for the text data source
|1+1
|数据砖头
|"doh"
|""".stripMargin))
// scalastyle:on nonascii
assert(data.length == 1)
}

test("correctness of wholetext option") {
import org.apache.spark.sql.catalyst.util._
withTempDir { dir =>
val file1 = new File(dir, "text1.txt")
stringToFile(file1,
"""text file 1 contents.
|From: None to: ??
""".stripMargin)
val file2 = new File(dir, "text2.txt")
stringToFile(file2, "text file 2 contents.")
val file3 = new File(dir, "text3.txt")
stringToFile(file3, "text file 3 contents.")
val df = spark.read.option("wholetext", "true").text(dir.getAbsolutePath)
// Since wholetext option reads each file into a single row, df.length should be no. of files.
val data = df.sort("value").collect()
assert(data.length == 3)
// Each files should represent a single Row/element in Dataframe/Dataset
assert(data(0) == Row(
"""text file 1 contents.
|From: None to: ??
""".stripMargin))
assert(data(1) == Row(
"""text file 2 contents.""".stripMargin))
assert(data(2) == Row(
"""text file 3 contents.""".stripMargin))
}
}


test("Correctness of wholetext option with gzip compression mode.") {
withTempDir { dir =>
val path = dir.getCanonicalPath
val df1 = spark.range(0, 1000).selectExpr("CAST(id AS STRING) AS s").repartition(1)
df1.write.option("compression", "gzip").mode("overwrite").text(path)
// On reading through wholetext mode, one file will be read as a single row, i.e. not
// delimited by "next line" character.
val expected = Row(Range(0, 1000).mkString("", "\n", "\n"))
Seq(10, 100, 1000).foreach { bytes =>
withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> bytes.toString) {
val df2 = spark.read.option("wholetext", "true").format("text").load(path)
val result = df2.collect().head
assert(result === expected)
}
}
}
}
}