1717
1818package org .apache .spark .streaming .dstream
1919
20-
21- import java .io ._
20+ import java .io .{IOException , ObjectInputStream , ObjectOutputStream }
2221
2322import scala .deprecated
2423import scala .collection .mutable .HashMap
2524import scala .reflect .ClassTag
26- import java .io .{IOException , ObjectInputStream , ObjectOutputStream }
27- import scala .util .control .Breaks ._
2825
2926import org .apache .spark .{Logging , SparkException }
3027import org .apache .spark .rdd .{BlockRDD , RDD }
@@ -34,7 +31,6 @@ import org.apache.spark.streaming.StreamingContext._
3431import org .apache .spark .streaming .scheduler .Job
3532import org .apache .spark .util .MetadataCleaner
3633import org .apache .spark .streaming .Duration
37- import org .apache .spark .api .python .PythonRDD
3834
3935/**
4036 * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
@@ -562,11 +558,9 @@ abstract class DStream[T: ClassTag] (
562558 // DStreams can't be serialized with closures, we can't proactively check
563559 // it for serializability and so we pass the optional false to SparkContext.clean
564560
565- // serialized python
566561 val cleanedF = context.sparkContext.clean(transformFunc, false )
567562 val realTransformFunc = (rdds : Seq [RDD [_]], time : Time ) => {
568563 assert(rdds.length == 1 )
569- // if transformfunc is fine, it is okay
570564 cleanedF(rdds.head.asInstanceOf [RDD [T ]], time)
571565 }
572566 new TransformedDStream [U ](Seq (this ), realTransformFunc)
0 commit comments