@@ -53,13 +53,13 @@ class FlumePollingInputDStream[T: ClassTag](
5353 val maxBatchSize : Int ,
5454 val parallelism : Int ,
5555 storageLevel : StorageLevel
56- ) extends ReceiverInputDStream [SparkPollingEvent ](ssc_) {
56+ ) extends ReceiverInputDStream [SparkFlumePollingEvent ](ssc_) {
5757 /**
5858 * Gets the receiver object that will be sent to the worker nodes
5959 * to receive data. This method needs to defined by any specific implementation
6060 * of a NetworkInputDStream.
6161 */
62- override def getReceiver (): Receiver [SparkPollingEvent ] = {
62+ override def getReceiver (): Receiver [SparkFlumePollingEvent ] = {
6363 new FlumePollingReceiver (addresses, maxBatchSize, parallelism, storageLevel)
6464 }
6565}
@@ -69,7 +69,7 @@ private[streaming] class FlumePollingReceiver(
6969 maxBatchSize : Int ,
7070 parallelism : Int ,
7171 storageLevel : StorageLevel
72- ) extends Receiver [SparkPollingEvent ](storageLevel) with Logging {
72+ ) extends Receiver [SparkFlumePollingEvent ](storageLevel) with Logging {
7373
7474 lazy val channelFactoryExecutor =
7575 Executors .newCachedThreadPool(new ThreadFactoryBuilder ().setDaemon(true ).
@@ -105,7 +105,7 @@ private[streaming] class FlumePollingReceiver(
105105 logDebug(" Received batch of " + events.size() + " events with sequence number: " + seq)
106106 try {
107107 // Convert each Flume event to a serializable SparkPollingEvent
108- events.foreach(event => store(SparkPollingEvent .fromSparkSinkEvent(event)))
108+ events.foreach(event => store(SparkFlumePollingEvent .fromSparkSinkEvent(event)))
109109 // Send an ack to Flume so that Flume discards the events from its channels.
110110 client.ack(seq)
111111 } catch {
@@ -129,7 +129,7 @@ private[streaming] class FlumePollingReceiver(
129129 }
130130 }
131131
132- override def store (dataItem : SparkPollingEvent ) {
132+ override def store (dataItem : SparkFlumePollingEvent ) {
133133 // Not entirely sure store is thread-safe for all storage levels - so wrap it in synchronized
134134 // This takes a performance hit, since the parallelism is useful only for pulling data now.
135135 this .synchronized {
@@ -155,9 +155,9 @@ private[streaming] class FlumePollingReceiver(
155155private class FlumeConnection (val transceiver : NettyTransceiver ,
156156 val client : SparkFlumeProtocol .Callback )
157157
158- private [streaming] object SparkPollingEvent {
159- def fromSparkSinkEvent (in : SparkSinkEvent ): SparkPollingEvent = {
160- val event = new SparkPollingEvent ()
158+ private [streaming] object SparkFlumePollingEvent {
159+ def fromSparkSinkEvent (in : SparkSinkEvent ): SparkFlumePollingEvent = {
160+ val event = new SparkFlumePollingEvent ()
161161 event.event = in
162162 event
163163 }
@@ -167,7 +167,7 @@ private[streaming] object SparkPollingEvent {
167167 * SparkSinkEvent is identical to AvroFlumeEvent, we need to create a new class and a wrapper
168168 * around that to make it externalizable.
169169 */
170- class SparkPollingEvent () extends Externalizable with Logging {
170+ class SparkFlumePollingEvent () extends Externalizable with Logging {
171171 var event : SparkSinkEvent = new SparkSinkEvent ()
172172
173173 /* De-serialize from bytes. */
0 commit comments