@@ -76,7 +76,7 @@ private[streaming] class FlumePollingReceiver(
7676 lazy val receiverExecutor = Executors .newFixedThreadPool(parallelism,
7777 new ThreadFactoryBuilder ().setDaemon(true ).setNameFormat(" Flume Receiver Thread - %d" ).build())
7878
79- private val connections = new LinkedBlockingQueue [FlumeConnection ]()
79+ private lazy val connections = new LinkedBlockingQueue [FlumeConnection ]()
8080
8181 override def onStart (): Unit = {
8282 // Create the connections to each Flume agent.
@@ -102,11 +102,11 @@ private[streaming] class FlumePollingReceiver(
102102 logDebug(
103103 " Received batch of " + events.size() + " events with sequence number: " + seq)
104104 try {
105- // Convert each Flume event to a serializable SparkPollingEvent
105+ // Convert each Flume event to a serializable SparkFlumeEvent
106106 val buffer = new ArrayBuffer [SparkFlumeEvent ](events.size())
107107 var j = 0
108108 while (j < events.size()) {
109- buffer += sparkSinkEventToSparkFlumeEvent (events(j))
109+ buffer += toSparkFlumeEvent (events(j))
110110 j += 1
111111 }
112112 store(buffer)
@@ -156,9 +156,9 @@ private[streaming] class FlumePollingReceiver(
156156 /**
157157 * Utility method to convert [[SparkSinkEvent ]] to [[SparkFlumeEvent ]]
158158 * @param event - Event to convert to SparkFlumeEvent
159- * @return - The SparkSinkEvent generated from Spar
159+ * @return - The SparkFlumeEvent generated from SparkSinkEvent
160160 */
161- private def sparkSinkEventToSparkFlumeEvent (event : SparkSinkEvent ): SparkFlumeEvent = {
161+ private def toSparkFlumeEvent (event : SparkSinkEvent ): SparkFlumeEvent = {
162162 val sparkFlumeEvent = new SparkFlumeEvent ()
163163 sparkFlumeEvent.event.setBody(event.getBody)
164164 sparkFlumeEvent.event.setHeaders(event.getHeaders)
0 commit comments