File tree Expand file tree Collapse file tree 1 file changed +15
-0
lines changed Expand file tree Collapse file tree 1 file changed +15
-0
lines changed Original file line number Diff line number Diff line change 2121from pyspark .serializers import PickleSerializer , BatchedSerializer , UTF8Deserializer
2222from pyspark .context import SparkContext
2323from pyspark .streaming .dstream import DStream
24+ from pyspark .streaming .duration import Duration
2425
2526from py4j .java_collections import ListConverter
2627
@@ -107,6 +108,20 @@ def awaitTermination(self, timeout=None):
107108 else :
108109 self ._jssc .awaitTermination (timeout )
109110
111+ def remember (self , duration ):
112+ """
113+ Set each DStreams in this context to remember RDDs it generated in the last given duration.
114+ DStreams remember RDDs only for a limited duration of time and releases them for garbage
115+ collection. This method allows the developer to specify how to long to remember the RDDs (
116+ if the developer wishes to query old data outside the DStream computation).
117+ @param duration pyspark.streaming.duration.Duration object.
118+ Minimum duration that each DStream should remember its RDDs
119+ """
120+ if not isinstance (duration , Duration ):
121+ raise TypeError ("Input should be pyspark.streaming.duration.Duration object" )
122+
123+ self ._jssc .remember (duration ._jduration )
124+
110125 # TODO: add storageLevel
111126 def socketTextStream (self , hostname , port ):
112127 """
You can’t perform that action at this time.
0 commit comments