Skip to content

Commit 0177dae

Browse files
committed
Fixes dynamic partitioning support for lower Hadoop versions
1 parent cf1d32e commit 0177dae

File tree

1 file changed

+22
-4
lines changed

1 file changed

+22
-4
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ private[hive] class SparkHiveWriterContainer(
5555
private var taID: SerializableWritable[TaskAttemptID] = null
5656

5757
@transient private var writer: FileSinkOperator.RecordWriter = null
58-
@transient private lazy val committer = conf.value.getOutputCommitter
59-
@transient private lazy val jobContext = newJobContext(conf.value, jID.value)
58+
@transient protected lazy val committer = conf.value.getOutputCommitter
59+
@transient protected lazy val jobContext = newJobContext(conf.value, jID.value)
6060
@transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
6161
@transient private lazy val outputFormat =
6262
conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef,Writable]]
@@ -122,8 +122,6 @@ private[hive] class SparkHiveWriterContainer(
122122
}
123123
}
124124

125-
// ********* Private Functions *********
126-
127125
private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
128126
jobID = jobId
129127
splitID = splitId
@@ -157,12 +155,18 @@ private[hive] object SparkHiveWriterContainer {
157155
}
158156
}
159157

158+
private[spark] object SparkHiveDynamicPartitionWriterContainer {
159+
val SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = "mapreduce.fileoutputcommitter.marksuccessfuljobs"
160+
}
161+
160162
private[spark] class SparkHiveDynamicPartitionWriterContainer(
161163
@transient jobConf: JobConf,
162164
fileSinkConf: FileSinkDesc,
163165
dynamicPartColNames: Array[String])
164166
extends SparkHiveWriterContainer(jobConf, fileSinkConf) {
165167

168+
import SparkHiveDynamicPartitionWriterContainer._
169+
166170
private val defaultPartName = jobConf.get(
167171
ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal)
168172

@@ -179,6 +183,20 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
179183
commit()
180184
}
181185

186+
override def commitJob(): Unit = {
187+
// This is a hack to avoid writing _SUCCESS mark file. In lower versions of Hadoop (e.g. 1.0.4),
188+
// semantics of FileSystem.globStatus() is different from higher versions (e.g. 2.4.1) and will
189+
// include _SUCCESS file when glob'ing for dynamic partition data files.
190+
//
191+
// Better solution is to add a step similar to what Hive FileSinkOperator.jobCloseOp does:
192+
// calling something like Utilities.mvFileToFinalPath to cleanup the output directory and then
193+
// load it with loadDynamicPartitions/loadPartition/loadTable.
194+
val oldMarker = jobConf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)
195+
jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false)
196+
super.commitJob()
197+
jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
198+
}
199+
182200
override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = {
183201
val dynamicPartPath = dynamicPartColNames
184202
.zip(row.takeRight(dynamicPartColNames.length))

0 commit comments

Comments
 (0)