Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ private[hive] class SparkHiveWriterContainer(
private var taID: SerializableWritable[TaskAttemptID] = null

@transient private var writer: FileSinkOperator.RecordWriter = null
@transient private lazy val committer = conf.value.getOutputCommitter
@transient private lazy val jobContext = newJobContext(conf.value, jID.value)
@transient protected lazy val committer = conf.value.getOutputCommitter
@transient protected lazy val jobContext = newJobContext(conf.value, jID.value)
@transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
@transient private lazy val outputFormat =
conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef,Writable]]
Expand Down Expand Up @@ -122,8 +122,6 @@ private[hive] class SparkHiveWriterContainer(
}
}

// ********* Private Functions *********

private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
jobID = jobId
splitID = splitId
Expand Down Expand Up @@ -157,12 +155,18 @@ private[hive] object SparkHiveWriterContainer {
}
}

private[spark] object SparkHiveDynamicPartitionWriterContainer {
val SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = "mapreduce.fileoutputcommitter.marksuccessfuljobs"
}

private[spark] class SparkHiveDynamicPartitionWriterContainer(
@transient jobConf: JobConf,
fileSinkConf: FileSinkDesc,
dynamicPartColNames: Array[String])
extends SparkHiveWriterContainer(jobConf, fileSinkConf) {

import SparkHiveDynamicPartitionWriterContainer._

private val defaultPartName = jobConf.get(
ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal)

Expand All @@ -179,6 +183,20 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
commit()
}

override def commitJob(): Unit = {
// This is a hack to avoid writing _SUCCESS mark file. In lower versions of Hadoop (e.g. 1.0.4),
// semantics of FileSystem.globStatus() is different from higher versions (e.g. 2.4.1) and will
// include _SUCCESS file when glob'ing for dynamic partition data files.
//
// Better solution is to add a step similar to what Hive FileSinkOperator.jobCloseOp does:
// calling something like Utilities.mvFileToFinalPath to cleanup the output directory and then
// load it with loadDynamicPartitions/loadPartition/loadTable.
val oldMarker = jobConf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)
jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false)
super.commitJob()
jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
}

override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = {
val dynamicPartPath = dynamicPartColNames
.zip(row.takeRight(dynamicPartColNames.length))
Expand Down