@@ -22,8 +22,6 @@ import java.net.InetSocketAddress
2222import java .nio .ByteBuffer
2323import java .util .concurrent .{LinkedBlockingQueue , TimeUnit , Executors }
2424
25- import org .apache .spark .flume .sink .SparkSinkUtils
26-
2725import scala .collection .JavaConversions ._
2826import scala .reflect .ClassTag
2927
@@ -33,45 +31,44 @@ import org.apache.avro.ipc.specific.SpecificRequestor
3331import org .jboss .netty .channel .socket .nio .NioClientSocketChannelFactory
3432
3533import org .apache .spark .Logging
36- import org .apache .spark .flume .{SparkSinkEvent , SparkFlumeProtocol }
3734import org .apache .spark .storage .StorageLevel
3835import org .apache .spark .streaming .StreamingContext
3936import org .apache .spark .streaming .dstream .ReceiverInputDStream
4037import org .apache .spark .streaming .receiver .Receiver
38+ import org .apache .spark .streaming .flume .sink ._
39+
4140
4241/**
4342 * A [[ReceiverInputDStream ]] that can be used to read data from several Flume agents running
4443 * [[org.apache.spark.flume.sink.SparkSink ]]s.
45- * @param ssc_ Streaming context that will execute this input stream
44+ * @param _ssc Streaming context that will execute this input stream
4645 * @param addresses List of addresses at which SparkSinks are listening
4746 * @param maxBatchSize Maximum size of a batch
4847 * @param parallelism Number of parallel connections to open
4948 * @param storageLevel The storage level to use.
5049 * @tparam T Class type of the object of this stream
5150 */
51+ private [streaming]
5252class FlumePollingInputDStream [T : ClassTag ](
53- @ transient ssc_ : StreamingContext ,
54- val addresses : Seq [InetSocketAddress ],
55- val maxBatchSize : Int ,
56- val parallelism : Int ,
57- storageLevel : StorageLevel
58- ) extends ReceiverInputDStream [SparkFlumePollingEvent ](ssc_) {
59- /**
60- * Gets the receiver object that will be sent to the worker nodes
61- * to receive data. This method needs to defined by any specific implementation
62- * of a NetworkInputDStream.
63- */
53+ @ transient _ssc : StreamingContext ,
54+ val addresses : Seq [InetSocketAddress ],
55+ val maxBatchSize : Int ,
56+ val parallelism : Int ,
57+ storageLevel : StorageLevel
58+ ) extends ReceiverInputDStream [SparkFlumePollingEvent ](_ssc) {
59+
6460 override def getReceiver (): Receiver [SparkFlumePollingEvent ] = {
6561 new FlumePollingReceiver (addresses, maxBatchSize, parallelism, storageLevel)
6662 }
6763}
6864
69- private [streaming] class FlumePollingReceiver (
70- addresses : Seq [InetSocketAddress ],
71- maxBatchSize : Int ,
72- parallelism : Int ,
73- storageLevel : StorageLevel
74- ) extends Receiver [SparkFlumePollingEvent ](storageLevel) with Logging {
65+ private [streaming]
66+ class FlumePollingReceiver (
67+ addresses : Seq [InetSocketAddress ],
68+ maxBatchSize : Int ,
69+ parallelism : Int ,
70+ storageLevel : StorageLevel
71+ ) extends Receiver [SparkFlumePollingEvent ](storageLevel) with Logging {
7572
7673 lazy val channelFactoryExecutor =
7774 Executors .newCachedThreadPool(new ThreadFactoryBuilder ().setDaemon(true ).
@@ -150,14 +147,6 @@ private[streaming] class FlumePollingReceiver(
150147 }
151148 }
152149
153- override def store (dataItem : SparkFlumePollingEvent ) {
154- // Not entirely sure store is thread-safe for all storage levels - so wrap it in synchronized
155- // This takes a performance hit, since the parallelism is useful only for pulling data now.
156- this .synchronized {
157- super .store(dataItem)
158- }
159- }
160-
161150 override def onStop (): Unit = {
162151 logInfo(" Shutting down Flume Polling Receiver" )
163152 receiverExecutor.shutdownNow()
@@ -176,6 +165,9 @@ private[streaming] class FlumePollingReceiver(
176165private class FlumeConnection (val transceiver : NettyTransceiver ,
177166 val client : SparkFlumeProtocol .Callback )
178167
168+ /**
169+ * Companion object of [[SparkFlumePollingEvent ]]
170+ */
179171private [streaming] object SparkFlumePollingEvent {
180172 def fromSparkSinkEvent (in : SparkSinkEvent ): SparkFlumePollingEvent = {
181173 val event = new SparkFlumePollingEvent ()
@@ -189,7 +181,7 @@ private[streaming] object SparkFlumePollingEvent {
189181 * SparkSinkEvent is identical to AvroFlumeEvent, we need to create a new class and a wrapper
190182 * around that to make it externalizable.
191183 */
192- class SparkFlumePollingEvent () extends Externalizable with Logging {
184+ class SparkFlumePollingEvent extends Externalizable with Logging {
193185 var event : SparkSinkEvent = new SparkSinkEvent ()
194186
195187 /* De-serialize from bytes. */
0 commit comments