Skip to content

Commit 4b0a43f

Browse files
committed
Refactoring utils into own objects. Cleaning up old commented-out code
1 parent d86325f commit 4b0a43f

File tree

20 files changed

+119
-168
lines changed

20 files changed

+119
-168
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package org.apache.spark.api.python
2+
3+
import org.apache.hadoop.conf.Configuration
4+
5+
/**
6+
* Utilities for working with Python objects -> Hadoop-related objects
7+
*/
8+
private[python] object PythonHadoopUtil {
9+
10+
def mapToConf(map: java.util.HashMap[String, String]) = {
11+
import collection.JavaConversions._
12+
val conf = new Configuration()
13+
map.foreach{ case (k, v) => conf.set(k, v) }
14+
conf
15+
}
16+
17+
/* Merges two configurations, keys from right overwrite any matching keys in left */
18+
def mergeConfs(left: Configuration, right: Configuration) = {
19+
import collection.JavaConversions._
20+
right.iterator().foreach(entry => left.set(entry.getKey, entry.getValue))
21+
left
22+
}
23+
24+
}

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 7 additions & 164 deletions
Original file line numberDiff line numberDiff line change
@@ -281,51 +281,7 @@ private[spark] object PythonRDD extends Logging {
281281

282282
// PySpark / Hadoop InputFormat stuff
283283

284-
def register[T](clazz: Class[T], msgpack: ScalaMessagePack) = {
285-
Try {
286-
if (!clazz.isPrimitive) msgpack.register(clazz)
287-
}.getOrElse(log.warn("Failed to register class (%s) with MsgPack. " +
288-
"Falling back to default MsgPack serialization, or 'toString' as last resort".format(clazz.toString)))
289-
}
290-
291-
// serialize and RDD[(K, V)] -> RDD[Array[Byte]] using MsgPack
292-
def serMsgPack[K, V](rdd: RDD[(K, V)]) = {
293-
import org.msgpack.ScalaMessagePack._
294-
val msgpack = new ScalaMessagePack with Serializable
295-
val first = rdd.first()
296-
val kc = ClassManifest.fromClass(first._1.getClass).asInstanceOf[ClassManifest[K]].erasure.asInstanceOf[Class[K]]
297-
val vc = ClassManifest.fromClass(first._2.getClass).asInstanceOf[ClassManifest[V]].erasure.asInstanceOf[Class[V]]
298-
register(kc, msgpack)
299-
register(vc, msgpack)
300-
/*
301-
Try {
302-
if (!kc.isPrimitive) msgpack.register(kc)
303-
if (!vc.isPrimitive) msgpack.register(vc)
304-
} match {
305-
case Failure(err) => log.warn(("Failed to register key/value class (%s/%s) with MsgPack. " +
306-
"Falling back to default MsgPack serialization, or 'toString' as last resort. " +
307-
"Exception: %s").format(kc, vc, err.getMessage))
308-
}
309-
*/
310-
rdd.map{ pair =>
311-
Try {
312-
msgpack.write(pair)
313-
} match {
314-
case Failure(err) =>
315-
Try {
316-
write((pair._1.toString, pair._2.toString))
317-
} match {
318-
case Success(result) => result
319-
case Failure(e) => throw e
320-
}
321-
case Success(result) => result
322-
323-
}
324-
//write(_)
325-
}
326-
}
327-
328-
// SequenceFile converted to Text and then to String
284+
// SequenceFile
329285
def sequenceFile[K ,V](sc: JavaSparkContext,
330286
path: String,
331287
keyClass: String,
@@ -339,37 +295,8 @@ private[spark] object PythonRDD extends Logging {
339295
val vc = vcm.erasure.asInstanceOf[Class[V]]
340296

341297
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
342-
val converted = convertRDD[K, V](rdd)
343-
JavaRDD.fromRDD(serMsgPack[K, V](converted))
344-
//JavaRDD.fromRDD(
345-
// .map{ case (a, b) => (a.toString, b.toString) }.map(stuff => write(stuff)))
346-
}
347-
348-
/*
349-
def sequenceFile[K, V](sc: JavaSparkContext,
350-
path: String,
351-
keyWrapper: String,
352-
valueWrapper: String,
353-
minSplits: Int): JavaRDD[Array[Byte]] = {
354-
val rdd = sc.sc.sequenceFile(path, classOf[Any], classOf[Any], minSplits)
355-
val converted = convertRDD[K, V](rdd)
356-
JavaRDD.fromRDD(serMsgPack[K, V](converted))
357-
//sequenceFile(sc, path, "java.lang.String", "java.lang.String", keyWrapper, valueWrapper, minSplits)
358-
}
359-
*/
360-
361-
def mapToConf(map: java.util.HashMap[String, String]) = {
362-
import collection.JavaConversions._
363-
val conf = new Configuration()
364-
map.foreach{ case (k, v) => conf.set(k, v) }
365-
conf
366-
}
367-
368-
/* Merges two configurations, keys from right overwrite any matching keys in left */
369-
def mergeConfs(left: Configuration, right: Configuration) = {
370-
import collection.JavaConversions._
371-
right.iterator().foreach(entry => left.set(entry.getKey, entry.getValue))
372-
left
298+
val converted = SerDeUtil.convertRDD[K, V](rdd)
299+
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
373300
}
374301

375302
// Arbitrary Hadoop InputFormat, key class and value class
@@ -381,19 +308,14 @@ private[spark] object PythonRDD extends Logging {
381308
keyWrapper: String,
382309
valueWrapper: String,
383310
confAsMap: java.util.HashMap[String, String]) = {
384-
val conf = mapToConf(confAsMap)
311+
val conf = PythonHadoopUtil.mapToConf(confAsMap)
385312
val baseConf = sc.hadoopConfiguration()
386-
val mergedConf = mergeConfs(baseConf, conf)
313+
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
387314
val rdd =
388315
newHadoopFileFromClassNames[K, V, F](sc,
389316
path, inputFormatClazz, keyClazz, valueClazz, keyWrapper, valueWrapper, mergedConf)
390-
//.map{ case (k, v) => (k.toString, v.toString) }
391-
val converted = convertRDD[K, V](rdd)
392-
JavaRDD.fromRDD(serMsgPack[K, V](converted))
393-
//JavaPairRDD.fromRDD(
394-
// newHadoopFileFromClassNames(sc, path, inputFormatClazz, keyClazz, valueClazz, keyWrapper, valueWrapper)
395-
// .map(new PairFunction[(K, V), String, String] { def call(t: (K, V)) = (t._1.toString, t._2.toString) } )
396-
//)
317+
val converted = SerDeUtil.convertRDD[K, V](rdd)
318+
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
397319
}
398320

399321
private def newHadoopFileFromClassNames[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
@@ -413,82 +335,6 @@ private[spark] object PythonRDD extends Logging {
413335
sc.sc.newAPIHadoopFile(path, fc, kc, vc, conf)
414336
}
415337

416-
/*
417-
private def sequenceFile[K, V](sc: JavaSparkContext,
418-
path: String,
419-
keyClazz: String,
420-
valueClazz: String,
421-
keyWrapper: String,
422-
valueWrapper: String,
423-
minSplits: Int) = {
424-
implicit val kcm = ClassManifest.fromClass(Class.forName("Any")).asInstanceOf[ClassManifest[K]]
425-
implicit val vcm = ClassManifest.fromClass(Class.forName("Any")).asInstanceOf[ClassManifest[V]]
426-
val kc = kcm.erasure.asInstanceOf[Class[K]]
427-
val vc = vcm.erasure.asInstanceOf[Class[V]]
428-
429-
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
430-
val converted = convertRDD[K, V](rdd)
431-
JavaRDD.fromRDD(serMsgPack[K, V](converted))
432-
433-
/*
434-
val rdd = if (kc.isInstanceOf[Writable] && vc.isInstanceOf[Writable]) {
435-
val writables = sc.sc.sequenceFile(path, kc.asInstanceOf[Class[Writable]], vc.asInstanceOf[Class[Writable]], minSplits)
436-
val w = writables.map{case (k,v) => (t.convert(k), t.convert(v))}
437-
//implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)).asInstanceOf[ClassManifest[K <:< Writable]]
438-
//ClassManifest.fromClass(kc.asInstanceOf[Class[Writable]])
439-
//sequenceFileWritable(sc, path ,minSplits).asInstanceOf[RDD[(K, V)]]
440-
//sequenceFileWritable(sc, kc, vc, path, minSplits)
441-
}
442-
else {
443-
sc.sc.sequenceFile[K, V](path, minSplits)
444-
445-
}
446-
447-
*/
448-
}
449-
*/
450-
451-
private def convertRDD[K, V](rdd: RDD[(K, V)]) = {
452-
rdd.map{
453-
case (k: Writable, v: Writable) => (convert(k).asInstanceOf[K], convert(v).asInstanceOf[V])
454-
case (k: Writable, v) => (convert(k).asInstanceOf[K], v.asInstanceOf[V])
455-
case (k, v: Writable) => (k.asInstanceOf[K], convert(v).asInstanceOf[V])
456-
case (k, v) => (k.asInstanceOf[K], v.asInstanceOf[V])
457-
}
458-
}
459-
460-
private def convert(writable: Writable): Any = {
461-
writable match {
462-
case iw: IntWritable => SparkContext.intWritableConverter().convert(iw)
463-
case dw: DoubleWritable => SparkContext.doubleWritableConverter().convert(dw)
464-
case lw: LongWritable => SparkContext.longWritableConverter().convert(lw)
465-
case fw: FloatWritable => SparkContext.floatWritableConverter().convert(fw)
466-
case t: Text => SparkContext.stringWritableConverter().convert(t)
467-
case bw: BooleanWritable => SparkContext.booleanWritableConverter().convert(bw)
468-
case byw: BytesWritable => SparkContext.bytesWritableConverter().convert(byw)
469-
case n: NullWritable => None
470-
case aw: ArrayWritable => aw.get().map(convert(_))
471-
case mw: MapWritable => mw.map{ case (k, v) => (convert(k), convert(v)) }.toMap
472-
case other => other
473-
}
474-
}
475-
476-
/*
477-
def sequenceFileWritable[K, V](sc: JavaSparkContext,
478-
path: String,
479-
minSplits: Int)
480-
//(implicit km: ClassManifest[K], vm: ClassManifest[V])
481-
// kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
482-
= {
483-
484-
import SparkContext._
485-
implicit val kcm = ClassManifest.fromClass(keyClazz) //.asInstanceOf[ClassManifest[K]]
486-
//implicit val vcm = ClassManifest.fromClass(valueClazz) //.asInstanceOf[ClassManifest[V]]
487-
sc.sc.sequenceFile(path) //, kc, vc, minSplits)
488-
// JavaRDD.fromRDD(serMsgPack[K, V](rdd))
489-
}
490-
*/
491-
492338
//
493339

494340
def writeToStream(elem: Any, dataOut: DataOutputStream)(implicit m: ClassManifest[Any]) {
@@ -503,9 +349,6 @@ private[spark] object PythonRDD extends Logging {
503349
dataOut.write(b)
504350
case str: String =>
505351
dataOut.writeUTF(str)
506-
//case (a: String, b: String) =>
507-
// dataOut.writeUTF(a)
508-
// dataOut.writeUTF(b)
509352
case other =>
510353
throw new SparkException("Unexpected element type " + other.getClass)
511354
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package org.apache.spark.api.python
2+
3+
import org.msgpack.ScalaMessagePack
4+
import scala.util.Try
5+
import org.apache.spark.rdd.RDD
6+
import java.io.Serializable
7+
import org.apache.spark.{SparkContext, Logging}
8+
import org.apache.hadoop.io._
9+
import scala.util.Success
10+
import scala.util.Failure
11+
12+
/**
13+
* Utilities for serialization / deserialization between Python and Java, using MsgPack.
14+
* Also contains utilities for converting [[org.apache.hadoop.io.Writable]] -> Scala objects and primitives
15+
*/
16+
private[python] object SerDeUtil extends Logging {
17+
18+
def register[T](clazz: Class[T], msgpack: ScalaMessagePack) {
19+
Try {
20+
log.info("%s".format(clazz))
21+
clazz match {
22+
case c if c.isPrimitive =>
23+
case c if c.isInstanceOf[java.lang.String] =>
24+
case _ => msgpack.register(clazz)
25+
}
26+
}.getOrElse(log.warn("Failed to register class (%s) with MsgPack. ".format(clazz.getName) +
27+
"Falling back to default MsgPack serialization, or 'toString' as last resort"))
28+
}
29+
30+
// serialize and RDD[(K, V)] -> RDD[Array[Byte]] using MsgPack
31+
def serMsgPack[K, V](rdd: RDD[(K, V)]) = {
32+
import org.msgpack.ScalaMessagePack._
33+
val msgpack = new ScalaMessagePack with Serializable
34+
val first = rdd.first()
35+
val kc = ClassManifest.fromClass(first._1.getClass).asInstanceOf[ClassManifest[K]].erasure.asInstanceOf[Class[K]]
36+
val vc = ClassManifest.fromClass(first._2.getClass).asInstanceOf[ClassManifest[V]].erasure.asInstanceOf[Class[V]]
37+
register(kc, msgpack)
38+
register(vc, msgpack)
39+
rdd.map{ pair =>
40+
Try {
41+
msgpack.write(pair)
42+
} match {
43+
case Failure(err) =>
44+
Try {
45+
write((pair._1.toString, pair._2.toString))
46+
} match {
47+
case Success(result) => result
48+
case Failure(e) => throw e
49+
}
50+
case Success(result) => result
51+
}
52+
}
53+
}
54+
55+
def convertRDD[K, V](rdd: RDD[(K, V)]) = {
56+
rdd.map{
57+
case (k: Writable, v: Writable) => (convert(k).asInstanceOf[K], convert(v).asInstanceOf[V])
58+
case (k: Writable, v) => (convert(k).asInstanceOf[K], v.asInstanceOf[V])
59+
case (k, v: Writable) => (k.asInstanceOf[K], convert(v).asInstanceOf[V])
60+
case (k, v) => (k.asInstanceOf[K], v.asInstanceOf[V])
61+
}
62+
}
63+
64+
def convert(writable: Writable): Any = {
65+
import collection.JavaConversions._
66+
writable match {
67+
case iw: IntWritable => SparkContext.intWritableConverter().convert(iw)
68+
case dw: DoubleWritable => SparkContext.doubleWritableConverter().convert(dw)
69+
case lw: LongWritable => SparkContext.longWritableConverter().convert(lw)
70+
case fw: FloatWritable => SparkContext.floatWritableConverter().convert(fw)
71+
case t: Text => SparkContext.stringWritableConverter().convert(t)
72+
case bw: BooleanWritable => SparkContext.booleanWritableConverter().convert(bw)
73+
case byw: BytesWritable => SparkContext.bytesWritableConverter().convert(byw)
74+
case n: NullWritable => None
75+
case aw: ArrayWritable => aw.get().map(convert(_))
76+
case mw: MapWritable => mw.map{ case (k, v) => (convert(k), convert(v)) }.toMap
77+
case other => other
78+
}
79+
}
80+
81+
}

python/pyspark/context.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -225,22 +225,25 @@ def sequenceFileAsText(self, name):
225225
jrdd = self._jvm.PythonRDD.sequenceFileAsText(self._jsc, name)
226226
return RDD(jrdd, self, MsgPackDeserializer()) # MsgPackDeserializer PairMUTF8Deserializer
227227

228-
def sequenceFile(self, name, keyClass, valueClass, keyWrapper="", valueWrapper="", minSplits=None):
228+
def sequenceFile(self, name, keyClass="org.apache.hadoop.io.Text", valueClass="org.apache.hadoop.io.Text",
229+
keyWrapper="", valueWrapper="", minSplits=None):
229230
"""
230231
Read a Hadoopp SequenceFile with arbitrary key and value class from HDFS,
231232
a local file system (available on all nodes), or any Hadoop-supported file system URI,
232233
and return it as an RDD of (String, String) where the key and value representations
233234
are generated using the 'toString()' method of the relevant Java class.
234235
235-
>>> sc.sequenceFile("/tmp/spark/test/sfint/").collect()
236+
>>> sc.sequenceFile("test_support/data/sfint/").collect()
236237
[(1, 'aa'), (2, 'bb'), (2, 'aa'), (3, 'cc'), (2, 'bb'), (1, 'aa')]
237238
"""
238239
minSplits = minSplits or min(self.defaultParallelism, 2)
239-
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, keyClass, valueClass, keyWrapper, valueWrapper, minSplits)
240+
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, keyClass, valueClass, keyWrapper, valueWrapper,
241+
minSplits)
240242
#jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, keyWrapper, valueWrapper, minSplits)
241243
return RDD(jrdd, self, MsgPackDeserializer()) # MsgPackDeserializer PairMUTF8Deserializer
242244

243-
def newHadoopFile(self, name, inputFormat, keyClass, valueClass, keyWrapper="toString", valueWrapper="toString", conf = {}):
245+
def newHadoopFile(self, name, inputFormat, keyClass, valueClass, keyWrapper="toString", valueWrapper="toString",
246+
conf = {}):
244247
"""
245248
Read a Hadoopp file with arbitrary InputFormat, key and value class from HDFS,
246249
a local file system (available on all nodes), or any Hadoop-supported file system URI,
8 Bytes
Binary file not shown.
12 Bytes
Binary file not shown.
12 Bytes
Binary file not shown.

python/test_support/data/sfdouble/_SUCCESS

Whitespace-only changes.
145 Bytes
Binary file not shown.
145 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)