File tree Expand file tree Collapse file tree 1 file changed +2
-4
lines changed Expand file tree Collapse file tree 1 file changed +2
-4
lines changed Original file line number Diff line number Diff line change 1717
1818from collections import defaultdict
1919from itertools import chain , ifilter , imap
20- import time
2120import operator
2221
2322from pyspark .serializers import NoOpSerializer ,\
@@ -246,8 +245,6 @@ def takeAndPrint(rdd, time):
246245 taken = rdd .take (11 )
247246 print "-------------------------------------------"
248247 print "Time: %s" % (str (time ))
249- print rdd .glom ().collect ()
250- print "-------------------------------------------"
251248 print "-------------------------------------------"
252249 for record in taken [:10 ]:
253250 print record
@@ -447,6 +444,7 @@ def pipeline_func(split, iterator):
447444 self ._prev_jdstream = prev ._prev_jdstream # maintain the pipeline
448445 self ._prev_jrdd_deserializer = prev ._prev_jrdd_deserializer
449446 self .is_cached = False
447+ self .is_checkpointed = False
450448 self ._ssc = prev ._ssc
451449 self .ctx = prev .ctx
452450 self .prev = prev
@@ -483,4 +481,4 @@ def _jdstream(self):
483481 return self ._jdstream_val
484482
485483 def _is_pipelinable (self ):
486- return not self .is_cached
484+ return not ( self .is_cached or self . is_checkpointed )
You can’t perform that action at this time.
0 commit comments