Skip to content

Commit 798dff7

Browse files
dragostdas
authored andcommitted
[SPARK-8975] [STREAMING] Adds a mechanism to send a new rate from the driver to the block generator
First step for [SPARK-7398](https://issues.apache.org/jira/browse/SPARK-7398). tdas huitseeker Author: Iulian Dragos <[email protected]> Author: François Garillot <[email protected]> Closes apache#7471 from dragos/topic/streaming-bp/dynamic-rate and squashes the following commits: 8941cf9 [Iulian Dragos] Renames and other nitpicks. 162d9e5 [Iulian Dragos] Use Reflection for accessing truly private `executor` method and use the listener bus to know when receivers have registered (`onStart` is called before receivers have registered, leading to flaky behavior). 210f495 [Iulian Dragos] Revert "Added a few tests that measure the receiver’s rate." 0c51959 [Iulian Dragos] Added a few tests that measure the receiver’s rate. 261a051 [Iulian Dragos] - removed field to hold the current rate limit in rate limiter - made rate limit a Long and default to Long.MaxValue (consequence of the above) - removed custom `waitUntil` and replaced it by `eventually` cd1397d [Iulian Dragos] Add a test for the propagation of a new rate limit from driver to receivers. 6369b30 [Iulian Dragos] Merge pull request #15 from huitseeker/SPARK-8975 d15de42 [François Garillot] [SPARK-8975][Streaming] Adds Ratelimiter unit tests w.r.t. spark.streaming.receiver.maxRate 4721c7d [François Garillot] [SPARK-8975][Streaming] Add a mechanism to send a new rate from the driver to the block generator
1 parent fe26584 commit 798dff7

File tree

8 files changed

+153
-8
lines changed

8 files changed

+153
-8
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,32 @@ import org.apache.spark.{Logging, SparkConf}
3434
*/
3535
private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
3636

37-
private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
38-
private lazy val rateLimiter = GuavaRateLimiter.create(desiredRate)
37+
// treated as an upper limit
38+
private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)
39+
private lazy val rateLimiter = GuavaRateLimiter.create(maxRateLimit.toDouble)
3940

4041
def waitToPush() {
41-
if (desiredRate > 0) {
42-
rateLimiter.acquire()
43-
}
42+
rateLimiter.acquire()
4443
}
44+
45+
/**
46+
* Return the current rate limit. If no limit has been set so far, it returns {{{Long.MaxValue}}}.
47+
*/
48+
def getCurrentLimit: Long =
49+
rateLimiter.getRate.toLong
50+
51+
/**
52+
* Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
53+
* {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
54+
*
55+
* @param newRate A new rate in events per second. It has no effect if it's 0 or negative.
56+
*/
57+
private[receiver] def updateRate(newRate: Long): Unit =
58+
if (newRate > 0) {
59+
if (maxRateLimit > 0) {
60+
rateLimiter.setRate(newRate.min(maxRateLimit))
61+
} else {
62+
rateLimiter.setRate(newRate)
63+
}
64+
}
4565
}

streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
271271
}
272272

273273
/** Get the attached executor. */
274-
private def executor = {
274+
private def executor: ReceiverSupervisor = {
275275
assert(executor_ != null, "Executor has not been attached to this receiver")
276276
executor_
277277
}

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,5 @@ import org.apache.spark.streaming.Time
2323
private[streaming] sealed trait ReceiverMessage extends Serializable
2424
private[streaming] object StopReceiver extends ReceiverMessage
2525
private[streaming] case class CleanupOldBlocks(threshTime: Time) extends ReceiverMessage
26-
26+
private[streaming] case class UpdateRateLimit(elementsPerSecond: Long)
27+
extends ReceiverMessage

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ private[streaming] abstract class ReceiverSupervisor(
5959
/** Time between a receiver is stopped and started again */
6060
private val defaultRestartDelay = conf.getInt("spark.streaming.receiverRestartDelay", 2000)
6161

62+
/** The current maximum rate limit for this receiver. */
63+
private[streaming] def getCurrentRateLimit: Option[Long] = None
64+
6265
/** Exception associated with the stopping of the receiver */
6366
@volatile protected var stoppingError: Throwable = null
6467

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ private[streaming] class ReceiverSupervisorImpl(
7777
case CleanupOldBlocks(threshTime) =>
7878
logDebug("Received delete old batch signal")
7979
cleanupOldBlocks(threshTime)
80+
case UpdateRateLimit(eps) =>
81+
logInfo(s"Received a new rate limit: $eps.")
82+
blockGenerator.updateRate(eps)
8083
}
8184
})
8285

@@ -98,6 +101,9 @@ private[streaming] class ReceiverSupervisorImpl(
98101
}
99102
}, streamId, env.conf)
100103

104+
override private[streaming] def getCurrentRateLimit: Option[Long] =
105+
Some(blockGenerator.getCurrentLimit)
106+
101107
/** Push a single record of received data into block generator. */
102108
def pushSingle(data: Any) {
103109
blockGenerator.addData(data)

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.{Logging, SparkEnv, SparkException}
2626
import org.apache.spark.rpc._
2727
import org.apache.spark.streaming.{StreamingContext, Time}
2828
import org.apache.spark.streaming.receiver.{CleanupOldBlocks, Receiver, ReceiverSupervisorImpl,
29-
StopReceiver}
29+
StopReceiver, UpdateRateLimit}
3030
import org.apache.spark.util.SerializableConfiguration
3131

3232
/**
@@ -226,6 +226,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
226226
logError(s"Deregistered receiver for stream $streamId: $messageWithError")
227227
}
228228

229+
/** Update a receiver's maximum ingestion rate */
230+
def sendRateUpdate(streamUID: Int, newRate: Long): Unit = {
231+
for (info <- receiverInfo.get(streamUID); eP <- Option(info.endpoint)) {
232+
eP.send(UpdateRateLimit(newRate))
233+
}
234+
}
235+
229236
/** Add new blocks for the given stream */
230237
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
231238
receivedBlockTracker.addBlock(receivedBlockInfo)
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.receiver
19+
20+
import org.apache.spark.SparkConf
21+
import org.apache.spark.SparkFunSuite
22+
23+
/** Testsuite for testing the network receiver behavior */
24+
class RateLimiterSuite extends SparkFunSuite {
25+
26+
test("rate limiter initializes even without a maxRate set") {
27+
val conf = new SparkConf()
28+
val rateLimiter = new RateLimiter(conf){}
29+
rateLimiter.updateRate(105)
30+
assert(rateLimiter.getCurrentLimit == 105)
31+
}
32+
33+
test("rate limiter updates when below maxRate") {
34+
val conf = new SparkConf().set("spark.streaming.receiver.maxRate", "110")
35+
val rateLimiter = new RateLimiter(conf){}
36+
rateLimiter.updateRate(105)
37+
assert(rateLimiter.getCurrentLimit == 105)
38+
}
39+
40+
test("rate limiter stays below maxRate despite large updates") {
41+
val conf = new SparkConf().set("spark.streaming.receiver.maxRate", "100")
42+
val rateLimiter = new RateLimiter(conf){}
43+
rateLimiter.updateRate(105)
44+
assert(rateLimiter.getCurrentLimit === 100)
45+
}
46+
}

streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,17 @@
1717

1818
package org.apache.spark.streaming.scheduler
1919

20+
import org.scalatest.concurrent.Eventually._
21+
import org.scalatest.concurrent.Timeouts
22+
import org.scalatest.time.SpanSugar._
2023
import org.apache.spark.streaming._
2124
import org.apache.spark.SparkConf
2225
import org.apache.spark.storage.StorageLevel
2326
import org.apache.spark.streaming.receiver._
2427
import org.apache.spark.util.Utils
28+
import org.apache.spark.streaming.dstream.InputDStream
29+
import scala.reflect.ClassTag
30+
import org.apache.spark.streaming.dstream.ReceiverInputDStream
2531

2632
/** Testsuite for receiver scheduling */
2733
class ReceiverTrackerSuite extends TestSuiteBase {
@@ -72,8 +78,64 @@ class ReceiverTrackerSuite extends TestSuiteBase {
7278
assert(locations(0).length === 1)
7379
assert(locations(3).length === 1)
7480
}
81+
82+
test("Receiver tracker - propagates rate limit") {
83+
object ReceiverStartedWaiter extends StreamingListener {
84+
@volatile
85+
var started = false
86+
87+
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
88+
started = true
89+
}
90+
}
91+
92+
ssc.addStreamingListener(ReceiverStartedWaiter)
93+
ssc.scheduler.listenerBus.start(ssc.sc)
94+
95+
val newRateLimit = 100L
96+
val inputDStream = new RateLimitInputDStream(ssc)
97+
val tracker = new ReceiverTracker(ssc)
98+
tracker.start()
99+
100+
// we wait until the Receiver has registered with the tracker,
101+
// otherwise our rate update is lost
102+
eventually(timeout(5 seconds)) {
103+
assert(ReceiverStartedWaiter.started)
104+
}
105+
tracker.sendRateUpdate(inputDStream.id, newRateLimit)
106+
// this is an async message, we need to wait a bit for it to be processed
107+
eventually(timeout(3 seconds)) {
108+
assert(inputDStream.getCurrentRateLimit.get === newRateLimit)
109+
}
110+
}
75111
}
76112

113+
/** An input DStream with a hard-coded receiver that gives access to internals for testing. */
114+
private class RateLimitInputDStream(@transient ssc_ : StreamingContext)
115+
extends ReceiverInputDStream[Int](ssc_) {
116+
117+
override def getReceiver(): DummyReceiver = SingletonDummyReceiver
118+
119+
def getCurrentRateLimit: Option[Long] = {
120+
invokeExecutorMethod.getCurrentRateLimit
121+
}
122+
123+
private def invokeExecutorMethod: ReceiverSupervisor = {
124+
val c = classOf[Receiver[_]]
125+
val ex = c.getDeclaredMethod("executor")
126+
ex.setAccessible(true)
127+
ex.invoke(SingletonDummyReceiver).asInstanceOf[ReceiverSupervisor]
128+
}
129+
}
130+
131+
/**
132+
* A Receiver as an object so we can read its rate limit.
133+
*
134+
* @note It's necessary to be a top-level object, or else serialization would create another
135+
* one on the executor side and we won't be able to read its rate limit.
136+
*/
137+
private object SingletonDummyReceiver extends DummyReceiver
138+
77139
/**
78140
* Dummy receiver implementation
79141
*/

0 commit comments

Comments
 (0)