Skip to content

Commit 3a4777c

Browse files
committed
Renamed NetworkInputDStream to ReceiverInputDStream, and ActorReceiver related stuff.
1 parent 838dd39 commit 3a4777c

File tree

46 files changed

+322
-282
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+322
-282
lines changed

examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import akka.actor.{Actor, ActorRef, Props, actorRef2Scala}
2626
import org.apache.spark.{SparkConf, SecurityManager}
2727
import org.apache.spark.streaming.{Seconds, StreamingContext}
2828
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
29-
import org.apache.spark.streaming.receivers.Receiver
3029
import org.apache.spark.util.AkkaUtils
30+
import org.apache.spark.streaming.receiver.ActorHelper
3131

3232
case class SubscribeReceiver(receiverActor: ActorRef)
3333
case class UnsubscribeReceiver(receiverActor: ActorRef)
@@ -81,14 +81,14 @@ class FeederActor extends Actor {
8181
* @see [[org.apache.spark.streaming.examples.FeederActor]]
8282
*/
8383
class SampleActorReceiver[T: ClassTag](urlOfPublisher: String)
84-
extends Actor with Receiver {
84+
extends Actor with ActorHelper {
8585

8686
lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
8787

8888
override def preStart = remotePublisher ! SubscribeReceiver(context.self)
8989

9090
def receive = {
91-
case msg => pushBlock(msg.asInstanceOf[T])
91+
case msg => store(msg.asInstanceOf[T])
9292
}
9393

9494
override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,17 @@ import org.apache.spark.storage.StorageLevel
3535
import org.apache.spark.streaming.StreamingContext
3636
import org.apache.spark.streaming.dstream._
3737
import org.apache.spark.Logging
38-
import org.apache.spark.streaming.receiver.NetworkReceiver
38+
import org.apache.spark.streaming.receiver.Receiver
3939

4040
private[streaming]
4141
class FlumeInputDStream[T: ClassTag](
4242
@transient ssc_ : StreamingContext,
4343
host: String,
4444
port: Int,
4545
storageLevel: StorageLevel
46-
) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
46+
) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
4747

48-
override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
48+
override def getReceiver(): Receiver[SparkFlumeEvent] = {
4949
new FlumeReceiver(host, port, storageLevel)
5050
}
5151
}
@@ -135,7 +135,7 @@ class FlumeReceiver(
135135
host: String,
136136
port: Int,
137137
storageLevel: StorageLevel
138-
) extends NetworkReceiver[SparkFlumeEvent](storageLevel) with Logging {
138+
) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
139139

140140
lazy val responder = new SpecificResponder(
141141
classOf[AvroSourceProtocol], new FlumeEventServer(this))

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ package org.apache.spark.streaming.flume
1919

2020
import org.apache.spark.storage.StorageLevel
2121
import org.apache.spark.streaming.StreamingContext
22-
import org.apache.spark.streaming.api.java.{JavaNetworkInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream}
23-
import org.apache.spark.streaming.dstream.{NetworkInputDStream, DStream}
22+
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream}
23+
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
2424

2525
object FlumeUtils {
2626
/**
@@ -35,7 +35,7 @@ object FlumeUtils {
3535
hostname: String,
3636
port: Int,
3737
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
38-
): NetworkInputDStream[SparkFlumeEvent] = {
38+
): ReceiverInputDStream[SparkFlumeEvent] = {
3939
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
4040
inputStream
4141
}
@@ -50,7 +50,7 @@ object FlumeUtils {
5050
jssc: JavaStreamingContext,
5151
hostname: String,
5252
port: Int
53-
): JavaNetworkInputDStream[SparkFlumeEvent] = {
53+
): JavaReceiverInputDStream[SparkFlumeEvent] = {
5454
createStream(jssc.ssc, hostname, port)
5555
}
5656

@@ -65,7 +65,7 @@ object FlumeUtils {
6565
hostname: String,
6666
port: Int,
6767
storageLevel: StorageLevel
68-
): JavaNetworkInputDStream[SparkFlumeEvent] = {
68+
): JavaReceiverInputDStream[SparkFlumeEvent] = {
6969
createStream(jssc.ssc, hostname, port, storageLevel)
7070
}
7171
}

external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,16 @@
1919

2020
import org.apache.spark.storage.StorageLevel;
2121
import org.apache.spark.streaming.LocalJavaStreamingContext;
22-
import org.apache.spark.streaming.api.java.JavaDStream;
2322

24-
import org.apache.spark.streaming.api.java.JavaNetworkInputDStream;
23+
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
2524
import org.junit.Test;
2625

2726
public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
2827
@Test
2928
public void testFlumeStream() {
3029
// tests the API, does not actually test data receiving
31-
JavaNetworkInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
32-
JavaNetworkInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
30+
JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
31+
JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
3332
StorageLevel.MEMORY_AND_DISK_SER_2());
3433
}
3534
}

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
3131
import org.apache.spark.storage.StorageLevel
3232
import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
3333
import org.apache.spark.streaming.util.ManualClock
34-
import org.apache.spark.streaming.api.java.JavaNetworkInputDStream
34+
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
3535

3636
class FlumeStreamSuite extends TestSuiteBase {
3737

@@ -40,11 +40,11 @@ class FlumeStreamSuite extends TestSuiteBase {
4040
test("flume input stream") {
4141
// Set up the streaming context and input streams
4242
val ssc = new StreamingContext(conf, batchDuration)
43-
val flumeStream: JavaNetworkInputDStream[SparkFlumeEvent] =
43+
val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
4444
FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
4545
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
4646
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
47-
val outputStream = new TestOutputStream(flumeStream.networkInputDStream, outputBuffer)
47+
val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
4848
outputStream.register()
4949
ssc.start()
5050

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.Logging
3333
import org.apache.spark.storage.StorageLevel
3434
import org.apache.spark.streaming.StreamingContext
3535
import org.apache.spark.streaming.dstream._
36-
import org.apache.spark.streaming.receiver.NetworkReceiver
36+
import org.apache.spark.streaming.receiver.Receiver
3737

3838
/**
3939
* Input stream that pulls messages from a Kafka Broker.
@@ -54,11 +54,11 @@ class KafkaInputDStream[
5454
kafkaParams: Map[String, String],
5555
topics: Map[String, Int],
5656
storageLevel: StorageLevel
57-
) extends NetworkInputDStream[(K, V)](ssc_) with Logging {
57+
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
5858

59-
def getReceiver(): NetworkReceiver[(K, V)] = {
59+
def getReceiver(): Receiver[(K, V)] = {
6060
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
61-
.asInstanceOf[NetworkReceiver[(K, V)]]
61+
.asInstanceOf[Receiver[(K, V)]]
6262
}
6363
}
6464

@@ -71,7 +71,7 @@ class KafkaReceiver[
7171
kafkaParams: Map[String, String],
7272
topics: Map[String, Int],
7373
storageLevel: StorageLevel
74-
) extends NetworkReceiver[Any](storageLevel) with Logging {
74+
) extends Receiver[Any](storageLevel) with Logging {
7575

7676
// Connection to Kafka
7777
var consumerConnector : ConsumerConnector = null

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import kafka.serializer.{Decoder, StringDecoder}
2727

2828
import org.apache.spark.storage.StorageLevel
2929
import org.apache.spark.streaming.StreamingContext
30-
import org.apache.spark.streaming.api.java.{JavaPairNetworkInputDStream, JavaStreamingContext, JavaPairDStream}
31-
import org.apache.spark.streaming.dstream.{NetworkInputDStream, DStream}
30+
import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext, JavaPairDStream}
31+
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
3232

3333

3434
object KafkaUtils {
@@ -48,7 +48,7 @@ object KafkaUtils {
4848
groupId: String,
4949
topics: Map[String, Int],
5050
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
51-
): NetworkInputDStream[(String, String)] = {
51+
): ReceiverInputDStream[(String, String)] = {
5252
val kafkaParams = Map[String, String](
5353
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
5454
"zookeeper.connection.timeout.ms" -> "10000")
@@ -70,7 +70,7 @@ object KafkaUtils {
7070
kafkaParams: Map[String, String],
7171
topics: Map[String, Int],
7272
storageLevel: StorageLevel
73-
): NetworkInputDStream[(K, V)] = {
73+
): ReceiverInputDStream[(K, V)] = {
7474
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
7575
}
7676

@@ -88,7 +88,7 @@ object KafkaUtils {
8888
zkQuorum: String,
8989
groupId: String,
9090
topics: JMap[String, JInt]
91-
): JavaPairNetworkInputDStream[String, String] = {
91+
): JavaPairReceiverInputDStream[String, String] = {
9292
implicit val cmt: ClassTag[String] =
9393
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
9494
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
@@ -110,7 +110,7 @@ object KafkaUtils {
110110
groupId: String,
111111
topics: JMap[String, JInt],
112112
storageLevel: StorageLevel
113-
): JavaPairNetworkInputDStream[String, String] = {
113+
): JavaPairReceiverInputDStream[String, String] = {
114114
implicit val cmt: ClassTag[String] =
115115
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
116116
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
@@ -139,7 +139,7 @@ object KafkaUtils {
139139
kafkaParams: JMap[String, String],
140140
topics: JMap[String, JInt],
141141
storageLevel: StorageLevel
142-
): JavaPairNetworkInputDStream[K, V] = {
142+
): JavaPairReceiverInputDStream[K, V] = {
143143
implicit val keyCmt: ClassTag[K] =
144144
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
145145
implicit val valueCmt: ClassTag[V] =

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,28 @@
1919

2020
import java.util.HashMap;
2121

22-
import org.apache.spark.streaming.api.java.JavaPairNetworkInputDStream;
22+
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
2323
import org.junit.Test;
2424
import com.google.common.collect.Maps;
2525
import kafka.serializer.StringDecoder;
2626
import org.apache.spark.storage.StorageLevel;
2727
import org.apache.spark.streaming.LocalJavaStreamingContext;
28-
import org.apache.spark.streaming.api.java.JavaPairDStream;
2928

3029
public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
3130
@Test
3231
public void testKafkaStream() {
3332
HashMap<String, Integer> topics = Maps.newHashMap();
3433

3534
// tests the API, does not actually test data receiving
36-
JavaPairNetworkInputDStream<String, String> test1 =
35+
JavaPairReceiverInputDStream<String, String> test1 =
3736
KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
38-
JavaPairNetworkInputDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
37+
JavaPairReceiverInputDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
3938
StorageLevel.MEMORY_AND_DISK_SER_2());
4039

4140
HashMap<String, String> kafkaParams = Maps.newHashMap();
4241
kafkaParams.put("zookeeper.connect", "localhost:12345");
4342
kafkaParams.put("group.id","consumer-group");
44-
JavaPairNetworkInputDStream<String, String> test3 = KafkaUtils.createStream(ssc,
43+
JavaPairReceiverInputDStream<String, String> test3 = KafkaUtils.createStream(ssc,
4544
String.class, String.class, StringDecoder.class, StringDecoder.class,
4645
kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
4746
}

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.kafka
2020
import kafka.serializer.StringDecoder
2121
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
2222
import org.apache.spark.storage.StorageLevel
23-
import org.apache.spark.streaming.dstream.NetworkInputDStream
23+
import org.apache.spark.streaming.dstream.ReceiverInputDStream
2424

2525
class KafkaStreamSuite extends TestSuiteBase {
2626

@@ -29,12 +29,12 @@ class KafkaStreamSuite extends TestSuiteBase {
2929
val topics = Map("my-topic" -> 1)
3030

3131
// tests the API, does not actually test data receiving
32-
val test1: NetworkInputDStream[(String, String)] =
32+
val test1: ReceiverInputDStream[(String, String)] =
3333
KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
34-
val test2: NetworkInputDStream[(String, String)] =
34+
val test2: ReceiverInputDStream[(String, String)] =
3535
KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
3636
val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
37-
val test3: NetworkInputDStream[(String, String)] =
37+
val test3: ReceiverInputDStream[(String, String)] =
3838
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
3939
ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
4040

external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.spark.Logging
3939
import org.apache.spark.storage.StorageLevel
4040
import org.apache.spark.streaming.StreamingContext
4141
import org.apache.spark.streaming.dstream._
42-
import org.apache.spark.streaming.receiver.NetworkReceiver
42+
import org.apache.spark.streaming.receiver.Receiver
4343

4444
/**
4545
* Input stream that subscribe messages from a Mqtt Broker.
@@ -55,9 +55,9 @@ class MQTTInputDStream(
5555
brokerUrl: String,
5656
topic: String,
5757
storageLevel: StorageLevel
58-
) extends NetworkInputDStream[String](ssc_) with Logging {
58+
) extends ReceiverInputDStream[String](ssc_) with Logging {
5959

60-
def getReceiver(): NetworkReceiver[String] = {
60+
def getReceiver(): Receiver[String] = {
6161
new MQTTReceiver(brokerUrl, topic, storageLevel)
6262
}
6363
}
@@ -67,7 +67,7 @@ class MQTTReceiver(
6767
brokerUrl: String,
6868
topic: String,
6969
storageLevel: StorageLevel
70-
) extends NetworkReceiver[String](storageLevel) {
70+
) extends Receiver[String](storageLevel) {
7171

7272
def onStop() {
7373

0 commit comments

Comments
 (0)