Skip to content

Commit d86325f

Browse files
committed
Initial WIP of PySpark support for SequenceFile and arbitrary Hadoop InputFormat
1 parent 5d46025 commit d86325f

File tree

4 files changed

+383
-16
lines changed

4 files changed

+383
-16
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 297 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,18 @@ import org.apache.spark.broadcast.Broadcast
2828
import org.apache.spark._
2929
import org.apache.spark.rdd.RDD
3030
import org.apache.spark.util.Utils
31-
31+
import org.apache.hadoop.io._
32+
import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat
33+
import org.apache.spark.api.java.function.PairFunction
34+
import scala.util.{Success, Failure, Try}
35+
import org.msgpack
36+
import org.msgpack.ScalaMessagePack
37+
import org.apache.hadoop.mapreduce.InputFormat
38+
39+
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
40+
import org.apache.hadoop.mapred.SequenceFileOutputFormat
41+
import org.apache.hadoop.conf.Configuration
42+
import java.util
3243

3344
private[spark] class PythonRDD[T: ClassManifest](
3445
parent: RDD[T],
@@ -185,7 +196,70 @@ private object SpecialLengths {
185196
val TIMING_DATA = -3
186197
}
187198

188-
private[spark] object PythonRDD {
199+
case class TestClass(var id: String, var number: Int) {
200+
def this() = this("", 0)
201+
}
202+
203+
object TestHadoop extends App {
204+
205+
//PythonRDD.writeToStream((1, "bar"), new DataOutputStream(new FileOutputStream("/tmp/test.out")))
206+
207+
208+
//val n = new NullWritable
209+
210+
import SparkContext._
211+
212+
val path = "/tmp/spark/test/sfarray/"
213+
//val path = "/Users/Nick/workspace/java/faunus/output/job-0/"
214+
215+
val sc = new SparkContext("local[2]", "test")
216+
217+
//val rdd = sc.sequenceFile[NullWritable, FaunusVertex](path)
218+
//val data = Seq((1.0, "aa"), (2.0, "bb"), (2.0, "aa"), (3.0, "cc"), (2.0, "bb"), (1.0, "aa"))
219+
val data = Seq(
220+
(1, Array(1.0, 2.0, 3.0)),
221+
(2, Array(3.0, 4.0, 5.0)),
222+
(3, Array(4.0, 5.0, 6.0))
223+
)
224+
val d = new DoubleWritable(5.0)
225+
val a = new ArrayWritable(classOf[DoubleWritable], Array(d))
226+
227+
val rdd = sc.parallelize(data, numSlices = 2)
228+
//.map({ case (k, v) => (new IntWritable(k), v.map(new DoubleWritable(_))) })
229+
.map{ case (k, v) => (new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_)))) }
230+
rdd.saveAsNewAPIHadoopFile[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[IntWritable, ArrayWritable]](path)
231+
232+
/*
233+
val data = Seq(
234+
("1", TestClass("test1", 123)),
235+
("2", TestClass("test2", 456)),
236+
("1", TestClass("test3", 123)),
237+
("3", TestClass("test56", 456)),
238+
("2", TestClass("test2", 123))
239+
)
240+
val rdd = sc.parallelize(data, numSlices = 2).map{ case (k, v) => (new Text(k), v) }
241+
rdd.saveAsNewAPIHadoopFile(path,
242+
classOf[Text], classOf[TestClass],
243+
classOf[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[Text, TestClass]])
244+
245+
//val rdd2 = Seq((1, ))
246+
247+
val seq = sc.sequenceFile[Double, String](path)
248+
val seqR = seq.collect()
249+
250+
val packed = PythonRDD.serMsgPack(rdd)
251+
val packedR = packed.collect()
252+
val packed2 = PythonRDD.serMsgPack(seq)
253+
val packedR2 = packed2.collect()
254+
255+
println(seqR.mkString(","))
256+
println(packedR.mkString(","))
257+
println(packedR2.mkString(","))
258+
*/
259+
260+
}
261+
262+
private[spark] object PythonRDD extends Logging {
189263

190264
def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int):
191265
JavaRDD[Array[Byte]] = {
@@ -205,18 +279,233 @@ private[spark] object PythonRDD {
205279
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
206280
}
207281

208-
def writeToStream(elem: Any, dataOut: DataOutputStream) {
282+
// PySpark / Hadoop InputFormat stuff
283+
284+
def register[T](clazz: Class[T], msgpack: ScalaMessagePack) = {
285+
Try {
286+
if (!clazz.isPrimitive) msgpack.register(clazz)
287+
}.getOrElse(log.warn("Failed to register class (%s) with MsgPack. " +
288+
"Falling back to default MsgPack serialization, or 'toString' as last resort".format(clazz.toString)))
289+
}
290+
291+
// serialize and RDD[(K, V)] -> RDD[Array[Byte]] using MsgPack
292+
def serMsgPack[K, V](rdd: RDD[(K, V)]) = {
293+
import org.msgpack.ScalaMessagePack._
294+
val msgpack = new ScalaMessagePack with Serializable
295+
val first = rdd.first()
296+
val kc = ClassManifest.fromClass(first._1.getClass).asInstanceOf[ClassManifest[K]].erasure.asInstanceOf[Class[K]]
297+
val vc = ClassManifest.fromClass(first._2.getClass).asInstanceOf[ClassManifest[V]].erasure.asInstanceOf[Class[V]]
298+
register(kc, msgpack)
299+
register(vc, msgpack)
300+
/*
301+
Try {
302+
if (!kc.isPrimitive) msgpack.register(kc)
303+
if (!vc.isPrimitive) msgpack.register(vc)
304+
} match {
305+
case Failure(err) => log.warn(("Failed to register key/value class (%s/%s) with MsgPack. " +
306+
"Falling back to default MsgPack serialization, or 'toString' as last resort. " +
307+
"Exception: %s").format(kc, vc, err.getMessage))
308+
}
309+
*/
310+
rdd.map{ pair =>
311+
Try {
312+
msgpack.write(pair)
313+
} match {
314+
case Failure(err) =>
315+
Try {
316+
write((pair._1.toString, pair._2.toString))
317+
} match {
318+
case Success(result) => result
319+
case Failure(e) => throw e
320+
}
321+
case Success(result) => result
322+
323+
}
324+
//write(_)
325+
}
326+
}
327+
328+
// SequenceFile converted to Text and then to String
329+
def sequenceFile[K ,V](sc: JavaSparkContext,
330+
path: String,
331+
keyClass: String,
332+
valueClass: String,
333+
keyWrapper: String,
334+
valueWrapper: String,
335+
minSplits: Int) = {
336+
implicit val kcm = ClassManifest.fromClass(Class.forName(keyClass)).asInstanceOf[ClassManifest[K]]
337+
implicit val vcm = ClassManifest.fromClass(Class.forName(valueClass)).asInstanceOf[ClassManifest[V]]
338+
val kc = kcm.erasure.asInstanceOf[Class[K]]
339+
val vc = vcm.erasure.asInstanceOf[Class[V]]
340+
341+
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
342+
val converted = convertRDD[K, V](rdd)
343+
JavaRDD.fromRDD(serMsgPack[K, V](converted))
344+
//JavaRDD.fromRDD(
345+
// .map{ case (a, b) => (a.toString, b.toString) }.map(stuff => write(stuff)))
346+
}
347+
348+
/*
349+
def sequenceFile[K, V](sc: JavaSparkContext,
350+
path: String,
351+
keyWrapper: String,
352+
valueWrapper: String,
353+
minSplits: Int): JavaRDD[Array[Byte]] = {
354+
val rdd = sc.sc.sequenceFile(path, classOf[Any], classOf[Any], minSplits)
355+
val converted = convertRDD[K, V](rdd)
356+
JavaRDD.fromRDD(serMsgPack[K, V](converted))
357+
//sequenceFile(sc, path, "java.lang.String", "java.lang.String", keyWrapper, valueWrapper, minSplits)
358+
}
359+
*/
360+
361+
def mapToConf(map: java.util.HashMap[String, String]) = {
362+
import collection.JavaConversions._
363+
val conf = new Configuration()
364+
map.foreach{ case (k, v) => conf.set(k, v) }
365+
conf
366+
}
367+
368+
/* Merges two configurations, keys from right overwrite any matching keys in left */
369+
def mergeConfs(left: Configuration, right: Configuration) = {
370+
import collection.JavaConversions._
371+
right.iterator().foreach(entry => left.set(entry.getKey, entry.getValue))
372+
left
373+
}
374+
375+
// Arbitrary Hadoop InputFormat, key class and value class
376+
def newHadoopFile[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
377+
path: String,
378+
inputFormatClazz: String,
379+
keyClazz: String,
380+
valueClazz: String,
381+
keyWrapper: String,
382+
valueWrapper: String,
383+
confAsMap: java.util.HashMap[String, String]) = {
384+
val conf = mapToConf(confAsMap)
385+
val baseConf = sc.hadoopConfiguration()
386+
val mergedConf = mergeConfs(baseConf, conf)
387+
val rdd =
388+
newHadoopFileFromClassNames[K, V, F](sc,
389+
path, inputFormatClazz, keyClazz, valueClazz, keyWrapper, valueWrapper, mergedConf)
390+
//.map{ case (k, v) => (k.toString, v.toString) }
391+
val converted = convertRDD[K, V](rdd)
392+
JavaRDD.fromRDD(serMsgPack[K, V](converted))
393+
//JavaPairRDD.fromRDD(
394+
// newHadoopFileFromClassNames(sc, path, inputFormatClazz, keyClazz, valueClazz, keyWrapper, valueWrapper)
395+
// .map(new PairFunction[(K, V), String, String] { def call(t: (K, V)) = (t._1.toString, t._2.toString) } )
396+
//)
397+
}
398+
399+
private def newHadoopFileFromClassNames[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
400+
path: String,
401+
inputFormatClazz: String,
402+
keyClazz: String,
403+
valueClazz: String,
404+
keyWrapper: String,
405+
valueWrapper: String,
406+
conf: Configuration) = {
407+
implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)).asInstanceOf[ClassManifest[K]]
408+
implicit val vcm = ClassManifest.fromClass(Class.forName(valueClazz)).asInstanceOf[ClassManifest[V]]
409+
implicit val fcm = ClassManifest.fromClass(Class.forName(inputFormatClazz)).asInstanceOf[ClassManifest[F]]
410+
val kc = kcm.erasure.asInstanceOf[Class[K]]
411+
val vc = vcm.erasure.asInstanceOf[Class[V]]
412+
val fc = fcm.erasure.asInstanceOf[Class[F]]
413+
sc.sc.newAPIHadoopFile(path, fc, kc, vc, conf)
414+
}
415+
416+
/*
417+
private def sequenceFile[K, V](sc: JavaSparkContext,
418+
path: String,
419+
keyClazz: String,
420+
valueClazz: String,
421+
keyWrapper: String,
422+
valueWrapper: String,
423+
minSplits: Int) = {
424+
implicit val kcm = ClassManifest.fromClass(Class.forName("Any")).asInstanceOf[ClassManifest[K]]
425+
implicit val vcm = ClassManifest.fromClass(Class.forName("Any")).asInstanceOf[ClassManifest[V]]
426+
val kc = kcm.erasure.asInstanceOf[Class[K]]
427+
val vc = vcm.erasure.asInstanceOf[Class[V]]
428+
429+
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
430+
val converted = convertRDD[K, V](rdd)
431+
JavaRDD.fromRDD(serMsgPack[K, V](converted))
432+
433+
/*
434+
val rdd = if (kc.isInstanceOf[Writable] && vc.isInstanceOf[Writable]) {
435+
val writables = sc.sc.sequenceFile(path, kc.asInstanceOf[Class[Writable]], vc.asInstanceOf[Class[Writable]], minSplits)
436+
val w = writables.map{case (k,v) => (t.convert(k), t.convert(v))}
437+
//implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)).asInstanceOf[ClassManifest[K <:< Writable]]
438+
//ClassManifest.fromClass(kc.asInstanceOf[Class[Writable]])
439+
//sequenceFileWritable(sc, path ,minSplits).asInstanceOf[RDD[(K, V)]]
440+
//sequenceFileWritable(sc, kc, vc, path, minSplits)
441+
}
442+
else {
443+
sc.sc.sequenceFile[K, V](path, minSplits)
444+
445+
}
446+
447+
*/
448+
}
449+
*/
450+
451+
private def convertRDD[K, V](rdd: RDD[(K, V)]) = {
452+
rdd.map{
453+
case (k: Writable, v: Writable) => (convert(k).asInstanceOf[K], convert(v).asInstanceOf[V])
454+
case (k: Writable, v) => (convert(k).asInstanceOf[K], v.asInstanceOf[V])
455+
case (k, v: Writable) => (k.asInstanceOf[K], convert(v).asInstanceOf[V])
456+
case (k, v) => (k.asInstanceOf[K], v.asInstanceOf[V])
457+
}
458+
}
459+
460+
private def convert(writable: Writable): Any = {
461+
writable match {
462+
case iw: IntWritable => SparkContext.intWritableConverter().convert(iw)
463+
case dw: DoubleWritable => SparkContext.doubleWritableConverter().convert(dw)
464+
case lw: LongWritable => SparkContext.longWritableConverter().convert(lw)
465+
case fw: FloatWritable => SparkContext.floatWritableConverter().convert(fw)
466+
case t: Text => SparkContext.stringWritableConverter().convert(t)
467+
case bw: BooleanWritable => SparkContext.booleanWritableConverter().convert(bw)
468+
case byw: BytesWritable => SparkContext.bytesWritableConverter().convert(byw)
469+
case n: NullWritable => None
470+
case aw: ArrayWritable => aw.get().map(convert(_))
471+
case mw: MapWritable => mw.map{ case (k, v) => (convert(k), convert(v)) }.toMap
472+
case other => other
473+
}
474+
}
475+
476+
/*
477+
def sequenceFileWritable[K, V](sc: JavaSparkContext,
478+
path: String,
479+
minSplits: Int)
480+
//(implicit km: ClassManifest[K], vm: ClassManifest[V])
481+
// kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
482+
= {
483+
484+
import SparkContext._
485+
implicit val kcm = ClassManifest.fromClass(keyClazz) //.asInstanceOf[ClassManifest[K]]
486+
//implicit val vcm = ClassManifest.fromClass(valueClazz) //.asInstanceOf[ClassManifest[V]]
487+
sc.sc.sequenceFile(path) //, kc, vc, minSplits)
488+
// JavaRDD.fromRDD(serMsgPack[K, V](rdd))
489+
}
490+
*/
491+
492+
//
493+
494+
def writeToStream(elem: Any, dataOut: DataOutputStream)(implicit m: ClassManifest[Any]) {
209495
elem match {
210496
case bytes: Array[Byte] =>
211497
dataOut.writeInt(bytes.length)
212498
dataOut.write(bytes)
213-
case pair: (Array[Byte], Array[Byte]) =>
214-
dataOut.writeInt(pair._1.length)
215-
dataOut.write(pair._1)
216-
dataOut.writeInt(pair._2.length)
217-
dataOut.write(pair._2)
499+
case (a: Array[Byte], b: Array[Byte]) =>
500+
dataOut.writeInt(a.length)
501+
dataOut.write(a)
502+
dataOut.writeInt(b.length)
503+
dataOut.write(b)
218504
case str: String =>
219505
dataOut.writeUTF(str)
506+
//case (a: String, b: String) =>
507+
// dataOut.writeUTF(a)
508+
// dataOut.writeUTF(b)
220509
case other =>
221510
throw new SparkException("Unexpected element type " + other.getClass)
222511
}

project/SparkBuild.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,8 @@ object SparkBuild extends Build {
248248
"com.codahale.metrics" % "metrics-ganglia" % "3.0.0",
249249
"com.codahale.metrics" % "metrics-graphite" % "3.0.0",
250250
"com.twitter" % "chill_2.9.3" % "0.3.1",
251-
"com.twitter" % "chill-java" % "0.3.1"
251+
"com.twitter" % "chill-java" % "0.3.1",
252+
"org.msgpack" %% "msgpack-scala" % "0.6.8"
252253
)
253254
)
254255

0 commit comments

Comments
 (0)