Skip to content

Commit 04c37b6

Browse files
tdaspwendell
authored andcommitted
[SPARK-1332] Improve Spark Streaming's Network Receiver and InputDStream API [WIP]
The current Network Receiver API makes it slightly complicated to right a new receiver as one needs to create an instance of BlockGenerator as shown in SocketReceiver https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala#L51 Exposing the BlockGenerator interface has made it harder to improve the receiving process. The API of NetworkReceiver (which was not a very stable API anyways) needs to be change if we are to ensure future stability. Additionally, the functions like streamingContext.socketStream that create input streams, return DStream objects. That makes it hard to expose functionality (say, rate limits) unique to input dstreams. They should return InputDStream or NetworkInputDStream. This is still not yet implemented. This PR is blocked on the graceful shutdown PR #247 Author: Tathagata Das <[email protected]> Closes #300 from tdas/network-receiver-api and squashes the following commits: ea27b38 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api 3a4777c [Tathagata Das] Renamed NetworkInputDStream to ReceiverInputDStream, and ActorReceiver related stuff. 838dd39 [Tathagata Das] Added more events to the StreamingListener to report errors and stopped receivers. a75c7a6 [Tathagata Das] Address some PR comments and fixed other issues. 91bfa72 [Tathagata Das] Fixed bugs. 8533094 [Tathagata Das] Scala style fixes. 028bde6 [Tathagata Das] Further refactored receiver to allow restarting of a receiver. 43f5290 [Tathagata Das] Made functions that create input streams return InputDStream and NetworkInputDStream, for both Scala and Java. 2c94579 [Tathagata Das] Fixed graceful shutdown by removing interrupts on receiving thread. 9e37a0b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api 3223e95 [Tathagata Das] Refactored the code that runs the NetworkReceiver into further classes and traits to make them more testable. a36cc48 [Tathagata Das] Refactored the NetworkReceiver API for future stability.
1 parent 5a5b334 commit 04c37b6

File tree

55 files changed

+1836
-731
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+1836
-731
lines changed

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,11 @@ private[spark] object UIUtils extends Logging {
121121
(records, "")
122122
}
123123
}
124-
"%.1f%s".formatLocal(Locale.US, value, unit)
124+
if (unit.isEmpty) {
125+
"%d".formatLocal(Locale.US, value)
126+
} else {
127+
"%.1f%s".formatLocal(Locale.US, value, unit)
128+
}
125129
}
126130

127131
// Yarn has to go through a proxy so the base uri is provided and has to be on all links

examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import akka.actor.{Actor, ActorRef, Props, actorRef2Scala}
2626
import org.apache.spark.{SparkConf, SecurityManager}
2727
import org.apache.spark.streaming.{Seconds, StreamingContext}
2828
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
29-
import org.apache.spark.streaming.receivers.Receiver
3029
import org.apache.spark.util.AkkaUtils
30+
import org.apache.spark.streaming.receiver.ActorHelper
3131

3232
case class SubscribeReceiver(receiverActor: ActorRef)
3333
case class UnsubscribeReceiver(receiverActor: ActorRef)
@@ -81,14 +81,14 @@ class FeederActor extends Actor {
8181
* @see [[org.apache.spark.streaming.examples.FeederActor]]
8282
*/
8383
class SampleActorReceiver[T: ClassTag](urlOfPublisher: String)
84-
extends Actor with Receiver {
84+
extends Actor with ActorHelper {
8585

8686
lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
8787

8888
override def preStart = remotePublisher ! SubscribeReceiver(context.self)
8989

9090
def receive = {
91-
case msg => pushBlock(msg.asInstanceOf[T])
91+
case msg => store(msg.asInstanceOf[T])
9292
}
9393

9494
override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,18 @@ import org.apache.spark.util.Utils
3434
import org.apache.spark.storage.StorageLevel
3535
import org.apache.spark.streaming.StreamingContext
3636
import org.apache.spark.streaming.dstream._
37+
import org.apache.spark.Logging
38+
import org.apache.spark.streaming.receiver.Receiver
3739

3840
private[streaming]
3941
class FlumeInputDStream[T: ClassTag](
4042
@transient ssc_ : StreamingContext,
4143
host: String,
4244
port: Int,
4345
storageLevel: StorageLevel
44-
) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
46+
) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
4547

46-
override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
48+
override def getReceiver(): Receiver[SparkFlumeEvent] = {
4749
new FlumeReceiver(host, port, storageLevel)
4850
}
4951
}
@@ -115,13 +117,13 @@ private[streaming] object SparkFlumeEvent {
115117
private[streaming]
116118
class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
117119
override def append(event : AvroFlumeEvent) : Status = {
118-
receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)
120+
receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
119121
Status.OK
120122
}
121123

122124
override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
123125
events.foreach (event =>
124-
receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event))
126+
receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
125127
Status.OK
126128
}
127129
}
@@ -133,23 +135,21 @@ class FlumeReceiver(
133135
host: String,
134136
port: Int,
135137
storageLevel: StorageLevel
136-
) extends NetworkReceiver[SparkFlumeEvent] {
138+
) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
137139

138-
lazy val blockGenerator = new BlockGenerator(storageLevel)
140+
lazy val responder = new SpecificResponder(
141+
classOf[AvroSourceProtocol], new FlumeEventServer(this))
142+
lazy val server = new NettyServer(responder, new InetSocketAddress(host, port))
139143

140-
protected override def onStart() {
141-
val responder = new SpecificResponder(
142-
classOf[AvroSourceProtocol], new FlumeEventServer(this))
143-
val server = new NettyServer(responder, new InetSocketAddress(host, port))
144-
blockGenerator.start()
144+
def onStart() {
145145
server.start()
146146
logInfo("Flume receiver started")
147147
}
148148

149-
protected override def onStop() {
150-
blockGenerator.stop()
149+
def onStop() {
150+
server.close()
151151
logInfo("Flume receiver stopped")
152152
}
153153

154-
override def getLocationPreference = Some(host)
154+
override def preferredLocation = Some(host)
155155
}

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ package org.apache.spark.streaming.flume
1919

2020
import org.apache.spark.storage.StorageLevel
2121
import org.apache.spark.streaming.StreamingContext
22-
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
23-
import org.apache.spark.streaming.dstream.DStream
22+
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream}
23+
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
2424

2525
object FlumeUtils {
2626
/**
@@ -35,7 +35,7 @@ object FlumeUtils {
3535
hostname: String,
3636
port: Int,
3737
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
38-
): DStream[SparkFlumeEvent] = {
38+
): ReceiverInputDStream[SparkFlumeEvent] = {
3939
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
4040
inputStream
4141
}
@@ -50,7 +50,7 @@ object FlumeUtils {
5050
jssc: JavaStreamingContext,
5151
hostname: String,
5252
port: Int
53-
): JavaDStream[SparkFlumeEvent] = {
53+
): JavaReceiverInputDStream[SparkFlumeEvent] = {
5454
createStream(jssc.ssc, hostname, port)
5555
}
5656

@@ -65,7 +65,7 @@ object FlumeUtils {
6565
hostname: String,
6666
port: Int,
6767
storageLevel: StorageLevel
68-
): JavaDStream[SparkFlumeEvent] = {
68+
): JavaReceiverInputDStream[SparkFlumeEvent] = {
6969
createStream(jssc.ssc, hostname, port, storageLevel)
7070
}
7171
}

external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,16 @@
1919

2020
import org.apache.spark.storage.StorageLevel;
2121
import org.apache.spark.streaming.LocalJavaStreamingContext;
22-
import org.apache.spark.streaming.api.java.JavaDStream;
2322

23+
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
2424
import org.junit.Test;
2525

2626
public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
2727
@Test
2828
public void testFlumeStream() {
2929
// tests the API, does not actually test data receiving
30-
JavaDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
31-
JavaDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
30+
JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
31+
JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
3232
StorageLevel.MEMORY_AND_DISK_SER_2());
3333
}
3434
}

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
3131
import org.apache.spark.storage.StorageLevel
3232
import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
3333
import org.apache.spark.streaming.util.ManualClock
34+
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
3435

3536
class FlumeStreamSuite extends TestSuiteBase {
3637

@@ -39,10 +40,11 @@ class FlumeStreamSuite extends TestSuiteBase {
3940
test("flume input stream") {
4041
// Set up the streaming context and input streams
4142
val ssc = new StreamingContext(conf, batchDuration)
42-
val flumeStream = FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
43+
val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
44+
FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
4345
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
4446
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
45-
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
47+
val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
4648
outputStream.register()
4749
ssc.start()
4850

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.spark.Logging
3333
import org.apache.spark.storage.StorageLevel
3434
import org.apache.spark.streaming.StreamingContext
3535
import org.apache.spark.streaming.dstream._
36+
import org.apache.spark.streaming.receiver.Receiver
3637

3738
/**
3839
* Input stream that pulls messages from a Kafka Broker.
@@ -53,11 +54,11 @@ class KafkaInputDStream[
5354
kafkaParams: Map[String, String],
5455
topics: Map[String, Int],
5556
storageLevel: StorageLevel
56-
) extends NetworkInputDStream[(K, V)](ssc_) with Logging {
57+
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
5758

58-
def getReceiver(): NetworkReceiver[(K, V)] = {
59+
def getReceiver(): Receiver[(K, V)] = {
5960
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
60-
.asInstanceOf[NetworkReceiver[(K, V)]]
61+
.asInstanceOf[Receiver[(K, V)]]
6162
}
6263
}
6364

@@ -70,21 +71,15 @@ class KafkaReceiver[
7071
kafkaParams: Map[String, String],
7172
topics: Map[String, Int],
7273
storageLevel: StorageLevel
73-
) extends NetworkReceiver[Any] {
74+
) extends Receiver[Any](storageLevel) with Logging {
7475

75-
// Handles pushing data into the BlockManager
76-
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
7776
// Connection to Kafka
7877
var consumerConnector : ConsumerConnector = null
7978

80-
def onStop() {
81-
blockGenerator.stop()
82-
}
79+
def onStop() { }
8380

8481
def onStart() {
8582

86-
blockGenerator.start()
87-
8883
// In case we are using multiple Threads to handle Kafka Messages
8984
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
9085

@@ -130,7 +125,7 @@ class KafkaReceiver[
130125
def run() {
131126
logInfo("Starting MessageHandler.")
132127
for (msgAndMetadata <- stream) {
133-
blockGenerator += (msgAndMetadata.key, msgAndMetadata.message)
128+
store((msgAndMetadata.key, msgAndMetadata.message))
134129
}
135130
}
136131
}

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import kafka.serializer.{Decoder, StringDecoder}
2727

2828
import org.apache.spark.storage.StorageLevel
2929
import org.apache.spark.streaming.StreamingContext
30-
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
31-
import org.apache.spark.streaming.dstream.DStream
30+
import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext, JavaPairDStream}
31+
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
3232

3333

3434
object KafkaUtils {
@@ -48,7 +48,7 @@ object KafkaUtils {
4848
groupId: String,
4949
topics: Map[String, Int],
5050
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
51-
): DStream[(String, String)] = {
51+
): ReceiverInputDStream[(String, String)] = {
5252
val kafkaParams = Map[String, String](
5353
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
5454
"zookeeper.connection.timeout.ms" -> "10000")
@@ -70,7 +70,7 @@ object KafkaUtils {
7070
kafkaParams: Map[String, String],
7171
topics: Map[String, Int],
7272
storageLevel: StorageLevel
73-
): DStream[(K, V)] = {
73+
): ReceiverInputDStream[(K, V)] = {
7474
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
7575
}
7676

@@ -88,7 +88,7 @@ object KafkaUtils {
8888
zkQuorum: String,
8989
groupId: String,
9090
topics: JMap[String, JInt]
91-
): JavaPairDStream[String, String] = {
91+
): JavaPairReceiverInputDStream[String, String] = {
9292
implicit val cmt: ClassTag[String] =
9393
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
9494
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
@@ -110,7 +110,7 @@ object KafkaUtils {
110110
groupId: String,
111111
topics: JMap[String, JInt],
112112
storageLevel: StorageLevel
113-
): JavaPairDStream[String, String] = {
113+
): JavaPairReceiverInputDStream[String, String] = {
114114
implicit val cmt: ClassTag[String] =
115115
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
116116
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
@@ -139,7 +139,7 @@ object KafkaUtils {
139139
kafkaParams: JMap[String, String],
140140
topics: JMap[String, JInt],
141141
storageLevel: StorageLevel
142-
): JavaPairDStream[K, V] = {
142+
): JavaPairReceiverInputDStream[K, V] = {
143143
implicit val keyCmt: ClassTag[K] =
144144
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
145145
implicit val valueCmt: ClassTag[V] =

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,29 @@
1818
package org.apache.spark.streaming.kafka;
1919

2020
import java.util.HashMap;
21+
22+
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
2123
import org.junit.Test;
2224
import com.google.common.collect.Maps;
2325
import kafka.serializer.StringDecoder;
2426
import org.apache.spark.storage.StorageLevel;
2527
import org.apache.spark.streaming.LocalJavaStreamingContext;
26-
import org.apache.spark.streaming.api.java.JavaPairDStream;
2728

2829
public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
2930
@Test
3031
public void testKafkaStream() {
3132
HashMap<String, Integer> topics = Maps.newHashMap();
3233

3334
// tests the API, does not actually test data receiving
34-
JavaPairDStream<String, String> test1 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
35-
JavaPairDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
35+
JavaPairReceiverInputDStream<String, String> test1 =
36+
KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
37+
JavaPairReceiverInputDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
3638
StorageLevel.MEMORY_AND_DISK_SER_2());
3739

3840
HashMap<String, String> kafkaParams = Maps.newHashMap();
3941
kafkaParams.put("zookeeper.connect", "localhost:12345");
4042
kafkaParams.put("group.id","consumer-group");
41-
JavaPairDStream<String, String> test3 = KafkaUtils.createStream(ssc,
43+
JavaPairReceiverInputDStream<String, String> test3 = KafkaUtils.createStream(ssc,
4244
String.class, String.class, StringDecoder.class, StringDecoder.class,
4345
kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
4446
}

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.kafka
2020
import kafka.serializer.StringDecoder
2121
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
2222
import org.apache.spark.storage.StorageLevel
23+
import org.apache.spark.streaming.dstream.ReceiverInputDStream
2324

2425
class KafkaStreamSuite extends TestSuiteBase {
2526

@@ -28,10 +29,13 @@ class KafkaStreamSuite extends TestSuiteBase {
2829
val topics = Map("my-topic" -> 1)
2930

3031
// tests the API, does not actually test data receiving
31-
val test1 = KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
32-
val test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
32+
val test1: ReceiverInputDStream[(String, String)] =
33+
KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
34+
val test2: ReceiverInputDStream[(String, String)] =
35+
KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
3336
val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
34-
val test3 = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
37+
val test3: ReceiverInputDStream[(String, String)] =
38+
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
3539
ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
3640

3741
// TODO: Actually test receiving data

0 commit comments

Comments
 (0)