@@ -26,7 +26,6 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils
2626import org .apache .hadoop .hive .ql .Context
2727import org .apache .hadoop .hive .ql .ErrorMsg
2828import org .apache .hadoop .hive .ql .metadata .Hive
29- import org .apache .hadoop .hive .ql .parse .SemanticException
3029import org .apache .hadoop .hive .ql .plan .{FileSinkDesc , TableDesc }
3130import org .apache .hadoop .hive .serde2 .Serializer
3231import org .apache .hadoop .hive .serde2 .objectinspector ._
@@ -104,91 +103,132 @@ case class InsertIntoHiveTable(
104103 }
105104
106105 def saveAsHiveFile (
107- rdd : RDD [Writable ],
106+ rdd : RDD [( Writable , String ) ],
108107 valueClass : Class [_],
109108 fileSinkConf : FileSinkDesc ,
110- conf : JobConf ,
111- isCompressed : Boolean ) {
109+ conf : SerializableWritable [JobConf ],
110+ isCompressed : Boolean ,
111+ dynamicPartNum : Int ) {
112112 if (valueClass == null ) {
113113 throw new SparkException (" Output value class not set" )
114114 }
115- conf.setOutputValueClass(valueClass)
115+ conf.value. setOutputValueClass(valueClass)
116116 if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null ) {
117117 throw new SparkException (" Output format class not set" )
118118 }
119119 // Doesn't work in Scala 2.9 due to what may be a generics bug
120120 // TODO: Should we uncomment this for Scala 2.10?
121121 // conf.setOutputFormat(outputFormatClass)
122- conf.set(" mapred.output.format.class" , fileSinkConf.getTableInfo.getOutputFileFormatClassName)
122+ conf.value. set(" mapred.output.format.class" , fileSinkConf.getTableInfo.getOutputFileFormatClassName)
123123 if (isCompressed) {
124124 // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
125125 // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
126126 // to store compression information.
127- conf.set(" mapred.output.compress" , " true" )
127+ conf.value. set(" mapred.output.compress" , " true" )
128128 fileSinkConf.setCompressed(true )
129- fileSinkConf.setCompressCodec(conf.get(" mapred.output.compression.codec" ))
130- fileSinkConf.setCompressType(conf.get(" mapred.output.compression.type" ))
129+ fileSinkConf.setCompressCodec(conf.value. get(" mapred.output.compression.codec" ))
130+ fileSinkConf.setCompressType(conf.value. get(" mapred.output.compression.type" ))
131131 }
132- conf.setOutputCommitter(classOf [FileOutputCommitter ])
133- FileOutputFormat .setOutputPath(
134- conf,
135- SparkHiveHadoopWriter .createPathFromString(fileSinkConf.getDirName, conf))
132+ conf.value.setOutputCommitter(classOf [FileOutputCommitter ])
136133
134+ FileOutputFormat .setOutputPath(
135+ conf.value,
136+ SparkHiveHadoopWriter .createPathFromString(fileSinkConf.getDirName, conf.value))
137137 log.debug(" Saving as hadoop file of type " + valueClass.getSimpleName)
138+ var writer : SparkHiveHadoopWriter = null
139+ // Map restore writesr for Dynamic Partition
140+ var writerMap : scala.collection.mutable.HashMap [String , SparkHiveHadoopWriter ] = null
141+ if (dynamicPartNum == 0 ) {
142+ writer = new SparkHiveHadoopWriter (conf.value, fileSinkConf)
143+ writer.preSetup()
144+ } else {
145+ writerMap = new scala.collection.mutable.HashMap [String , SparkHiveHadoopWriter ]
146+ }
138147
139- val writer = new SparkHiveHadoopWriter (conf, fileSinkConf)
140- writer.preSetup()
141-
142- def writeToFile (context : TaskContext , iter : Iterator [Writable ]) {
143- // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
144- // around by taking a mod. We expect that no task will be attempted 2 billion times.
145- val attemptNumber = (context.attemptId % Int .MaxValue ).toInt
146-
148+ def writeToFile (context : TaskContext , iter : Iterator [(Writable , String )]) {
149+ // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
150+ // around by taking a mod. We expect that no task will be attempted 2 billion times.
151+ val attemptNumber = (context.attemptId % Int .MaxValue ).toInt
152+ // writer for No Dynamic Partition
153+ if (dynamicPartNum == 0 ) {
147154 writer.setup(context.stageId, context.partitionId, attemptNumber)
148155 writer.open()
156+ } else {
149157
150- var count = 0
151- while (iter.hasNext) {
152- val record = iter.next()
153- count += 1
154- writer.write(record)
155- }
156-
157- writer.close()
158- writer.commit()
159158 }
160-
161- sc.sparkContext.runJob(rdd, writeToFile _)
162- writer.commitJob()
163- }
164-
165- def getDynamicPartDir (tableInfo : TableDesc , row : Row , dynamicPartNum2 : Int , jobConf : JobConf ) : String = {
166- dynamicPartNum2 match {
167- case 0 => " "
168- case i => {
169- val colsNum = tableInfo.getProperties.getProperty(" columns" ).split(" \\ ," ).length
170- val partColStr = tableInfo.getProperties.getProperty(" partition_columns" )
171- val partCols = partColStr.split(" /" )
172- var buf = new StringBuffer ()
173- if (partCols.length == dynamicPartNum2) {
174- for (j <- 0 until partCols.length) {
175- buf.append(" /" ).append(partCols(j)).append(" =" ).append(handleNull(row(colsNum + j ), jobConf))
176- }
177- } else {
178- for (j <- 0 until dynamicPartNum2) {
179- buf.append(" /" ).append(partCols(j + partCols.length - dynamicPartNum2)).append(" =" ).append(handleNull(row(colsNum + j), jobConf))
159+ var count = 0
160+ // writer for Dynamic Partition
161+ var writer2 : SparkHiveHadoopWriter = null
162+ while (iter.hasNext) {
163+ val record = iter.next()
164+ count += 1
165+ if (record._2 == null ) { // without Dynamic Partition
166+ writer.write(record._1)
167+ } else { // for Dynamic Partition
168+ val location = fileSinkConf.getDirName
169+ val partLocation = location + record._2 // this is why the writer can write to different file
170+ writer2 = writerMap.get(record._2) match {
171+ case Some (writer)=> writer
172+ case None => {
173+ val tempWriter = new SparkHiveHadoopWriter (conf.value, new FileSinkDesc (partLocation, fileSinkConf.getTableInfo, false ))
174+ tempWriter.setup(context.stageId, context.partitionId, attemptNumber)
175+ tempWriter.open(record._2)
176+ writerMap += (record._2 -> tempWriter)
177+ tempWriter
178+ }
179+ }
180+ writer2.write(record._1)
180181 }
181182 }
182- buf.toString
183+ if (dynamicPartNum == 0 ) {
184+ writer.close()
185+ writer.commit()
186+ } else {
187+ for ((k,v) <- writerMap) {
188+ v.close()
189+ v.commit()
190+ }
191+ }
183192 }
193+
194+ sc.sparkContext.runJob(rdd, writeToFile _)
195+ if (dynamicPartNum == 0 ) {
196+ writer.commitJob()
197+ } else {
198+ for ((k,v) <- writerMap) {
199+ v.commitJob()
200+ }
201+ writerMap.clear()
184202 }
185- }
186203
187- def handleNull (obj : Any , jobConf : JobConf ) : String = {
188- if (obj == null || obj.toString.length == 0 ) {
189- jobConf.get(" hive.exec.default.partition.name " , " __HIVE_DEFAULT_PARTITION__" )
204+
205+
206+ }
207+ /*
208+ * e.g.
209+ * for sql: Insert.....tablename(part1,part2) select ....val1,val2 from ...
210+ * return: /part1=val1/part2=val2
211+ * for sql: Insert.....tablename(part1=val1,part2) select ....,val2 from ...
212+ * return: /part2=val2
213+ * for sql: Insert.....tablename(part1=val1,part2,part3) select ....,val2,val3 from ...
214+ * return: /part2=val2/part3=val3
215+ * */
216+ private def getDynamicPartDir (partCols : Array [String ], row : Row , dynamicPartNum : Int , defaultPartName : String ): String = {
217+ assert(dynamicPartNum > 0 )
218+ partCols
219+ .takeRight(dynamicPartNum)
220+ .zip(row.takeRight(dynamicPartNum))
221+ .map { case (c, v) => s " / $c= ${handleNull(v, defaultPartName)}" }
222+ .mkString
223+ }
224+ /*
225+ * if rowVal is null or "",will return HiveConf.get(hive.exec.default.partition.name) with default
226+ * */
227+ private def handleNull (rowVal : Any , defaultPartName : String ): String = {
228+ if (rowVal == null || String .valueOf(rowVal).length == 0 ) {
229+ defaultPartName
190230 } else {
191- obj.toString
231+ String .valueOf(rowVal)
192232 }
193233 }
194234
@@ -211,32 +251,32 @@ case class InsertIntoHiveTable(
211251 val tableLocation = table.hiveQlTable.getDataLocation
212252 val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation)
213253 val fileSinkConf = new FileSinkDesc (tmpLocation.toString, tableDesc, false )
214- var dynamicPartNum = 0
254+ var tmpDynamicPartNum = 0
215255 var numStaPart = 0
216- var dynamicPartPath = " " ;
217256 val partitionSpec = partition.map {
218- case (key, Some (value)) => { numStaPart += 1 ; key -> value }
219- case (key, None ) => { dynamicPartNum += 1 ; key -> " " }
257+ case (key, Some (value)) =>
258+ numStaPart += 1
259+ key -> value
260+ case (key, None ) =>
261+ tmpDynamicPartNum += 1
262+ key -> " "
220263 }
221- // ORC stores compression information in table properties. While, there are other formats
222- // (e.g. RCFile) that rely on hadoop configurations to store compression information.
264+ val dynamicPartNum = tmpDynamicPartNum
223265 val jobConf = new JobConf (sc.hiveconf)
224266 val jobConfSer = new SerializableWritable (jobConf)
225267 // check if the partition spec is valid
226268 if (dynamicPartNum > 0 ) {
227269 if (! sc.hiveconf.getBoolVar(HiveConf .ConfVars .DYNAMICPARTITIONING )) {
228- throw new SemanticException (
229- ErrorMsg .DYNAMIC_PARTITION_DISABLED .getMsg())
270+ throw new SparkException (ErrorMsg .DYNAMIC_PARTITION_DISABLED .getMsg())
230271 }
231272 if (numStaPart == 0 && sc.hiveconf.getVar(HiveConf .ConfVars .DYNAMICPARTITIONINGMODE ).equalsIgnoreCase(" strict" )) {
232- throw new SemanticException (ErrorMsg .DYNAMIC_PARTITION_STRICT_MODE .getMsg());
273+ throw new SparkException (ErrorMsg .DYNAMIC_PARTITION_STRICT_MODE .getMsg())
233274 }
234275 // check if static partition appear after dynamic partitions
235276 for ((k,v) <- partitionSpec) {
236277 if (partitionSpec(k) == " " ) {
237278 if (numStaPart > 0 ) { // found a DP, but there exists ST as subpartition
238- throw new SemanticException (
239- ErrorMsg .PARTITION_DYN_STA_ORDER .getMsg());
279+ throw new SparkException (ErrorMsg .PARTITION_DYN_STA_ORDER .getMsg())
240280 }
241281 } else {
242282 numStaPart -= 1
@@ -252,96 +292,40 @@ case class InsertIntoHiveTable(
252292 ObjectInspectorCopyOption .JAVA )
253293 .asInstanceOf [StructObjectInspector ]
254294
255-
256295 val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
257296 val outputData = new Array [Any ](fieldOIs.length)
297+ val defaultPartName = jobConfSer.value.get(" hive.exec.default.partition.name " , " __HIVE_DEFAULT_PARTITION__" )
298+ var partColStr : Array [String ] = null ;
299+ if (fileSinkConf.getTableInfo.getProperties.getProperty(" partition_columns" ) != null ) {
300+ partColStr = fileSinkConf
301+ .getTableInfo
302+ .getProperties
303+ .getProperty(" partition_columns" )
304+ .split(" /" )
305+ }
306+
258307 iter.map { row =>
308+ var dynamicPartPath : String = null
309+ if (dynamicPartNum > 0 ) {
310+ dynamicPartPath = getDynamicPartDir(partColStr, row, dynamicPartNum, defaultPartName)
311+ }
259312 var i = 0
260313 while (i < fieldOIs.length) {
261- if (fieldOIs.length < row.length && row.length - fieldOIs.length == dynamicPartNum) {
262- dynamicPartPath = getDynamicPartDir(fileSinkConf.getTableInfo, row, dynamicPartNum, jobConfSer.value)
263- }
264314 // Casts Strings to HiveVarchars when necessary.
265315 outputData(i) = wrap(row(i), fieldOIs(i))
266316 i += 1
267317 }
268318
269- serializer.serialize(outputData, standardOI)
319+ serializer.serialize(outputData, standardOI) -> dynamicPartPath
270320 }
271321 }
272-
273- if (dynamicPartNum > 0 ) {
274- if (outputClass == null ) {
275- throw new SparkException (" Output value class not set" )
276- }
277- jobConfSer.value.setOutputValueClass(outputClass)
278- if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null ) {
279- throw new SparkException (" Output format class not set" )
280- }
281- // Doesn't work in Scala 2.9 due to what may be a generics bug
282- // TODO: Should we uncomment this for Scala 2.10?
283- // conf.setOutputFormat(outputFormatClass)
284- jobConfSer.value.set(" mapred.output.format.class" , fileSinkConf.getTableInfo.getOutputFileFormatClassName)
285- if (sc.hiveconf.getBoolean(" hive.exec.compress.output" , false )) {
286- // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
287- // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
288- // to store compression information.
289- jobConfSer.value.set(" mapred.output.compress" , " true" )
290- fileSinkConf.setCompressed(true )
291- fileSinkConf.setCompressCodec(jobConfSer.value.get(" mapred.output.compression.codec" ))
292- fileSinkConf.setCompressType(jobConfSer.value.get(" mapred.output.compression.type" ))
293- }
294- jobConfSer.value.setOutputCommitter(classOf [FileOutputCommitter ])
295-
296- FileOutputFormat .setOutputPath(
297- jobConfSer.value,
298- SparkHiveHadoopWriter .createPathFromString(fileSinkConf.getDirName, jobConfSer.value))
299-
300- var writerMap = new scala.collection.mutable.HashMap [String , SparkHiveHadoopWriter ]
301- def writeToFile2 (context : TaskContext , iter : Iterator [Writable ]) {
302- // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
303- // around by taking a mod. We expect that no task will be attempted 2 billion times.
304- val attemptNumber = (context.attemptId % Int .MaxValue ).toInt
305- val serializer = newSerializer(fileSinkConf.getTableInfo)
306- var count = 0
307- var writer2 : SparkHiveHadoopWriter = null
308- while (iter.hasNext) {
309- val record = iter.next();
310- val location = fileSinkConf.getDirName
311- val partLocation = location + dynamicPartPath
312- writer2= writerMap.get(dynamicPartPath) match {
313- case Some (writer)=> writer
314- case None => {
315- val tempWriter = new SparkHiveHadoopWriter (jobConfSer.value, new FileSinkDesc (partLocation, fileSinkConf.getTableInfo, false ))
316- tempWriter.setup(context.stageId, context.partitionId, attemptNumber)
317- tempWriter.open(dynamicPartPath);
318- writerMap += (dynamicPartPath -> tempWriter)
319- tempWriter
320- }
321- }
322- count += 1
323- writer2.write(record)
324- }
325- for ((k,v) <- writerMap) {
326- v.close()
327- v.commit()
328- }
329- }
330-
331- sc.sparkContext.runJob(rdd, writeToFile2 _)
332-
333- for ((k,v) <- writerMap) {
334- v.commitJob()
335- }
336- writerMap.clear()
337- } else {
338322 saveAsHiveFile(
339323 rdd,
340324 outputClass,
341325 fileSinkConf,
342- jobConf ,
343- sc.hiveconf.getBoolean(" hive.exec.compress.output" , false ))
344- }
326+ jobConfSer ,
327+ sc.hiveconf.getBoolean(" hive.exec.compress.output" , false ),
328+ dynamicPartNum)
345329
346330 val outputPath = FileOutputFormat .getOutputPath(jobConf)
347331 // Have to construct the format of dbname.tablename.
@@ -358,13 +342,13 @@ case class InsertIntoHiveTable(
358342 val inheritTableSpecs = true
359343 // TODO: Correctly set isSkewedStoreAsSubdir.
360344 val isSkewedStoreAsSubdir = false
361- if (dynamicPartNum > 0 ) {
345+ if (dynamicPartNum> 0 ) {
362346 db.loadDynamicPartitions(
363347 outputPath,
364348 qualifiedTableName,
365349 partitionSpec,
366350 overwrite,
367- dynamicPartNum/* dpCtx.getNumDPCols() */ ,
351+ dynamicPartNum,
368352 holdDDLTime,
369353 isSkewedStoreAsSubdir
370354 )
0 commit comments