1717
1818from collections import defaultdict
1919from itertools import chain , ifilter , imap
20- import time
2120import operator
2221
2322from pyspark .serializers import NoOpSerializer ,\
2423 BatchedSerializer , CloudPickleSerializer , pack_long
2524from pyspark .rdd import _JavaStackTrace
25+ from pyspark .storagelevel import StorageLevel
26+ from pyspark .resultiterable import ResultIterable
2627
2728from py4j .java_collections import ListConverter , MapConverter
2829
@@ -35,6 +36,8 @@ def __init__(self, jdstream, ssc, jrdd_deserializer):
3536 self ._ssc = ssc
3637 self .ctx = ssc ._sc
3738 self ._jrdd_deserializer = jrdd_deserializer
39+ self .is_cached = False
40+ self .is_checkpointed = False
3841
3942 def context (self ):
4043 """
@@ -234,8 +237,6 @@ def takeAndPrint(rdd, time):
234237 taken = rdd .take (11 )
235238 print "-------------------------------------------"
236239 print "Time: %s" % (str (time ))
237- print rdd .glom ().collect ()
238- print "-------------------------------------------"
239240 print "-------------------------------------------"
240241 for record in taken [:10 ]:
241242 print record
@@ -290,32 +291,65 @@ def get_output(rdd, time):
290291
291292 self .foreachRDD (get_output )
292293
293- def _test_switch_dserializer (self , serializer_que ):
294+ def cache (self ):
295+ """
296+ Persist this DStream with the default storage level (C{MEMORY_ONLY_SER}).
297+ """
298+ self .is_cached = True
299+ self .persist (StorageLevel .MEMORY_ONLY_SER )
300+ return self
301+
302+ def persist (self , storageLevel ):
303+ """
304+ Set this DStream's storage level to persist its values across operations
305+ after the first time it is computed. This can only be used to assign
306+ a new storage level if the DStream does not have a storage level set yet.
307+ """
308+ self .is_cached = True
309+ javaStorageLevel = self .ctx ._getJavaStorageLevel (storageLevel )
310+ self ._jdstream .persist (javaStorageLevel )
311+ return self
312+
313+ def checkpoint (self , interval ):
294314 """
295- Deserializer is dynamically changed based on numSlice and the number of
296- input. This function choose deserializer. Currently this is just FIFO.
315+ Mark this DStream for checkpointing. It will be saved to a file inside the
316+ checkpoint directory set with L{SparkContext.setCheckpointDir()}
317+
318+ I am not sure this part in DStream
319+ and
320+ all references to its parent RDDs will be removed. This function must
321+ be called before any job has been executed on this RDD. It is strongly
322+ recommended that this RDD is persisted in memory, otherwise saving it
323+ on a file will require recomputation.
324+
325+ interval must be pysprak.streaming.duration
297326 """
298-
299- jrdd_deserializer = self ._jrdd_deserializer
327+ self .is_checkpointed = True
328+ self ._jdstream .checkpoint (interval )
329+ return self
330+
331+ def groupByKey (self , numPartitions = None ):
332+ def createCombiner (x ):
333+ return [x ]
300334
301- def switch (rdd , jtime ):
302- try :
303- print serializer_que
304- jrdd_deserializer = serializer_que .pop (0 )
305- print jrdd_deserializer
306- except Exception as e :
307- print e
335+ def mergeValue (xs , x ):
336+ xs .append (x )
337+ return xs
308338
309- self .foreachRDD (switch )
339+ def mergeCombiners (a , b ):
340+ a .extend (b )
341+ return a
310342
343+ return self .combineByKey (createCombiner , mergeValue , mergeCombiners ,
344+ numPartitions ).mapValues (lambda x : ResultIterable (x ))
311345
312346
313347# TODO: implement groupByKey
348+ # TODO: implement saveAsTextFile
349+
350+ # Following operation has dependency to transform
314351# TODO: impelment union
315- # TODO: implement cache
316- # TODO: implement persist
317352# TODO: implement repertitions
318- # TODO: implement saveAsTextFile
319353# TODO: implement cogroup
320354# TODO: implement join
321355# TODO: implement countByValue
@@ -342,6 +376,7 @@ def pipeline_func(split, iterator):
342376 self ._prev_jdstream = prev ._prev_jdstream # maintain the pipeline
343377 self ._prev_jrdd_deserializer = prev ._prev_jrdd_deserializer
344378 self .is_cached = False
379+ self .is_checkpointed = False
345380 self ._ssc = prev ._ssc
346381 self .ctx = prev .ctx
347382 self .prev = prev
@@ -378,4 +413,4 @@ def _jdstream(self):
378413 return self ._jdstream_val
379414
380415 def _is_pipelinable (self ):
381- return not self .is_cached
416+ return not ( self .is_cached or self . is_checkpointed )
0 commit comments