Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.{Logging, SparkException, Partition => SparkPartition}
import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}

private[sql] class DefaultSource extends HadoopFsRelationProvider {
override def createRelation(
Expand All @@ -60,50 +60,21 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
extends OutputWriter {

private val recordWriter: RecordWriter[Void, InternalRow] = {
val conf = context.getConfiguration
val outputFormat = {
// When appending new Parquet files to an existing Parquet file directory, to avoid
// overwriting existing data files, we need to find out the max task ID encoded in these data
// file names.
// TODO Make this snippet a utility function for other data source developers
val maxExistingTaskId = {
// Note that `path` may point to a temporary location. Here we retrieve the real
// destination path from the configuration
val outputPath = new Path(conf.get("spark.sql.sources.output.path"))
val fs = outputPath.getFileSystem(conf)

if (fs.exists(outputPath)) {
// Pattern used to match task ID in part file names, e.g.:
//
// part-r-00001.gz.parquet
// ^~~~~
val partFilePattern = """part-.-(\d{1,}).*""".r

fs.listStatus(outputPath).map(_.getPath.getName).map {
case partFilePattern(id) => id.toInt
case name if name.startsWith("_") => 0
case name if name.startsWith(".") => 0
case name => throw new AnalysisException(
s"Trying to write Parquet files to directory $outputPath, " +
s"but found items with illegal name '$name'.")
}.reduceOption(_ max _).getOrElse(0)
} else {
0
}
}

new ParquetOutputFormat[InternalRow]() {
// Here we override `getDefaultWorkFile` for two reasons:
//
// 1. To allow appending. We need to generate output file name based on the max available
// task ID computed above.
// 1. To allow appending. We need to generate unique output file names to avoid
// overwriting existing files (either exist before the write job, or are just written
// by other tasks within the same write job).
//
// 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses
// `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
// partitions in the case of dynamic partitioning.
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1
new Path(path, f"part-r-$split%05d$extension")
val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
val split = context.getTaskAttemptID.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
}
}
}
Expand Down
64 changes: 51 additions & 13 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

package org.apache.spark.sql.sources

import java.util.Date
import java.util.{Date, UUID}

import scala.collection.mutable

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, FileOutputCommitter => MapReduceFileOutputCommitter}
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}

import org.apache.spark._
import org.apache.spark.mapred.SparkHadoopMapRedUtil
Expand Down Expand Up @@ -59,6 +58,28 @@ private[sql] case class InsertIntoDataSource(
}
}

/**
* A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
* Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a
* single write job, and owns a UUID that identifies this job. Each concrete implementation of
* [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for
* each task output file. This UUID is passed to executor side via a property named
* `spark.sql.sources.writeJobUUID`.
*
* Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
* are used to write to normal tables and tables with dynamic partitions.
*
* Basic work flow of this command is:
*
* 1. Driver side setup, including output committer initialization and data source specific
* preparation work for the write job to be issued.
* 2. Issues a write job consists of one or more executor side tasks, each of which writes all
* rows within an RDD partition.
* 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any
* exception is thrown during task commitment, also aborts that task.
* 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is
* thrown during job commitment, also aborts the job.
*/
private[sql] case class InsertIntoHadoopFsRelation(
@transient relation: HadoopFsRelation,
@transient query: LogicalPlan,
Expand Down Expand Up @@ -261,7 +282,14 @@ private[sql] abstract class BaseWriterContainer(
with Logging
with Serializable {

protected val serializableConf = new SerializableConfiguration(ContextUtil.getConfiguration(job))
protected val serializableConf = new SerializableConfiguration(job.getConfiguration)

// This UUID is used to avoid output file name collision between different appending write jobs.
// These jobs may belong to different SparkContext instances. Concrete data source implementations
// may use this UUID to generate unique file names (e.g., `part-r-<task-id>-<job-uuid>.parquet`).
// The reason why this ID is used to identify a job rather than a single task output file is
// that, speculative tasks must generate the same output file name as the original task.
private val uniqueWriteJobId = UUID.randomUUID()

// This is only used on driver side.
@transient private val jobContext: JobContext = job
Expand Down Expand Up @@ -290,6 +318,11 @@ private[sql] abstract class BaseWriterContainer(
setupIDs(0, 0, 0)
setupConf()

// This UUID is sent to executor side together with the serialized `Configuration` object within
// the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate
// unique task output files.
job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)

// Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
// clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
// configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
Expand Down Expand Up @@ -417,15 +450,16 @@ private[sql] class DefaultWriterContainer(
assert(writer != null, "OutputWriter instance should have been initialized")
writer.close()
super.commitTask()
} catch {
case cause: Throwable =>
super.abortTask()
throw new RuntimeException("Failed to commit task", cause)
} catch { case cause: Throwable =>
// This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and will
// cause `abortTask()` to be invoked.
throw new RuntimeException("Failed to commit task", cause)
}
}

override def abortTask(): Unit = {
try {
// It's possible that the task fails before `writer` gets initialized
if (writer != null) {
writer.close()
}
Expand Down Expand Up @@ -469,21 +503,25 @@ private[sql] class DynamicPartitionWriterContainer(
})
}

override def commitTask(): Unit = {
try {
private def clearOutputWriters(): Unit = {
if (outputWriters.nonEmpty) {
outputWriters.values.foreach(_.close())
outputWriters.clear()
}
}

override def commitTask(): Unit = {
try {
clearOutputWriters()
super.commitTask()
} catch { case cause: Throwable =>
super.abortTask()
throw new RuntimeException("Failed to commit task", cause)
}
}

override def abortTask(): Unit = {
try {
outputWriters.values.foreach(_.close())
outputWriters.clear()
clearOutputWriters()
} finally {
super.abortTask()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.hive.HiveMetastoreTypes
import org.apache.spark.sql.types.StructType

private[orc] object OrcFileOperator extends Logging{
private[orc] object OrcFileOperator extends Logging {
def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = {
val conf = config.getOrElse(new Configuration)
val fspath = new Path(pathStr)
val fs = fspath.getFileSystem(conf)
val orcFiles = listOrcFiles(pathStr, conf)

logDebug(s"Creating ORC Reader from ${orcFiles.head}")
// TODO Need to consider all files when schema evolution is taken into account.
OrcFile.createReader(fs, orcFiles.head)
}
Expand All @@ -42,6 +42,7 @@ private[orc] object OrcFileOperator extends Logging{
val reader = getFileReader(path, conf)
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
val schema = readerInspector.getTypeName
logDebug(s"Reading schema from file $path, got Hive schema string: $schema")
HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
}

Expand All @@ -52,14 +53,14 @@ private[orc] object OrcFileOperator extends Logging{
def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
val origPath = new Path(pathStr)
val fs = origPath.getFileSystem(conf)
val path = origPath.makeQualified(fs)
val path = origPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath)
.filterNot(_.isDir)
.map(_.getPath)
.filterNot(_.getName.startsWith("_"))
.filterNot(_.getName.startsWith("."))

if (paths == null || paths.size == 0) {
if (paths == null || paths.isEmpty) {
throw new IllegalArgumentException(
s"orcFileOperator: path $path does not have valid orc files matching the pattern")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, Reco
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}

import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.{HadoopRDD, RDD}
Expand All @@ -39,7 +40,6 @@ import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreType
import org.apache.spark.sql.sources.{Filter, _}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{Logging}
import org.apache.spark.util.SerializableConfiguration

/* Implicit conversions */
Expand Down Expand Up @@ -105,8 +105,9 @@ private[orc] class OrcOutputWriter(
recordWriterInstantiated = true

val conf = context.getConfiguration
val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
val partition = context.getTaskAttemptID.getTaskID.getId
val filename = f"part-r-$partition%05d-${System.currentTimeMillis}%015d.orc"
val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc"

new OrcOutputFormat().getRecordWriter(
new Path(path, filename).getFileSystem(conf),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import scala.collection.JavaConversions._
object TestHive
extends TestHiveContext(
new SparkContext(
System.getProperty("spark.sql.test.master", "local[2]"),
System.getProperty("spark.sql.test.master", "local[32]"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should still use local[*]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'd better use a fixed number here to improve determinism (if we use 32 from the beginning, the ORC bug would be much easier to reproduce).

"TestSQLContext",
new SparkConf()
.set("spark.sql.test", "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,14 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
orcTableDir.mkdir()
import org.apache.spark.sql.hive.test.TestHive.implicits._

// Originally we were using a 10-row RDD for testing. However, when default parallelism is
// greater than 10 (e.g., running on a node with 32 cores), this RDD contains empty partitions,
// which result in empty ORC files. Unfortunately, ORC doesn't handle empty files properly and
// causes build failure on Jenkins, which happens to have 32 cores. Please refer to SPARK-8501
// for more details. To workaround this issue before fixing SPARK-8501, we simply increase row
// number in this RDD to avoid empty partitions.
sparkContext
.makeRDD(1 to 10)
.makeRDD(1 to 100)
.map(i => OrcData(i, s"part-$i"))
.toDF()
.registerTempTable(s"orc_temp_table")
Expand All @@ -70,43 +76,43 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
}

test("create temporary orc table") {
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10))
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(100))

checkAnswer(
sql("SELECT * FROM normal_orc_source"),
(1 to 10).map(i => Row(i, s"part-$i")))
(1 to 100).map(i => Row(i, s"part-$i")))

checkAnswer(
sql("SELECT * FROM normal_orc_source where intField > 5"),
(6 to 10).map(i => Row(i, s"part-$i")))
(6 to 100).map(i => Row(i, s"part-$i")))

checkAnswer(
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
(1 to 10).map(i => Row(1, s"part-$i")))
(1 to 100).map(i => Row(1, s"part-$i")))
}

test("create temporary orc table as") {
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(10))
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(100))

checkAnswer(
sql("SELECT * FROM normal_orc_source"),
(1 to 10).map(i => Row(i, s"part-$i")))
(1 to 100).map(i => Row(i, s"part-$i")))

checkAnswer(
sql("SELECT * FROM normal_orc_source WHERE intField > 5"),
(6 to 10).map(i => Row(i, s"part-$i")))
(6 to 100).map(i => Row(i, s"part-$i")))

checkAnswer(
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
(1 to 10).map(i => Row(1, s"part-$i")))
(1 to 100).map(i => Row(1, s"part-$i")))
}

test("appending insert") {
sql("INSERT INTO TABLE normal_orc_source SELECT * FROM orc_temp_table WHERE intField > 5")

checkAnswer(
sql("SELECT * FROM normal_orc_source"),
(1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 10).flatMap { i =>
(1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 100).flatMap { i =>
Seq.fill(2)(Row(i, s"part-$i"))
})
}
Expand All @@ -119,7 +125,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {

checkAnswer(
sql("SELECT * FROM normal_orc_as_source"),
(6 to 10).map(i => Row(i, s"part-$i")))
(6 to 100).map(i => Row(i, s"part-$i")))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW
numberFormat.setGroupingUsed(false)

override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
val split = context.getTaskAttemptID.getTaskID.getId
val name = FileOutputFormat.getOutputName(context)
new Path(outputFile, s"$name-${numberFormat.format(split)}-${UUID.randomUUID()}")
new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId")
}
}

Expand Down Expand Up @@ -156,6 +157,7 @@ class CommitFailureTestRelation(
context: TaskAttemptContext): OutputWriter = {
new SimpleTextOutputWriter(path, context) {
override def close(): Unit = {
super.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about S3, where a file is not actually created before the output stream is closed (the PUT operation happens in NativeS3FsOutputStream.close()). But SimpleTextRelation is only used for local testing, so yeah, this line is not necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to leave it there. The writer should be closed anyway. Otherwise it's leaked.

sys.error("Intentional task commitment failure for testing purpose.")
}
}
Expand Down
Loading