Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,16 @@ class BernoulliCellSampler[T](lb: Double, ub: Double, complement: Boolean = fals
* @tparam T item type
*/
@DeveloperApi
class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] {
class BernoulliSampler[T: ClassTag](fraction: Double,
private val rng: Random = RandomSampler.newDefaultRNG)
extends RandomSampler[T, T] {

/** epsilon slop to avoid failure from floating point jitter */
require(
fraction >= (0.0 - RandomSampler.roundingEpsilon)
&& fraction <= (1.0 + RandomSampler.roundingEpsilon),
s"Sampling fraction ($fraction) must be on interval [0, 1]")

private val rng: Random = RandomSampler.newDefaultRNG

override def setSeed(seed: Long): Unit = rng.setSeed(seed)

override def sample(items: Iterator[T]): Iterator[T] = {
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
<fasterxml.jackson.version>2.4.4</fasterxml.jackson.version>
<snappy.version>1.1.1.7</snappy.version>
<netlib.java.version>1.1.2</netlib.java.version>
<reactivestreams.version>1.0.0</reactivestreams.version>

<test.java.home>${java.home}</test.java.home>

Expand Down Expand Up @@ -315,6 +316,11 @@
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>${reactivestreams.version}</version>
</dependency>
<dependency>
<groupId>${jline.groupid}</groupId>
<artifactId>jline</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions streaming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</dependency>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do no add additional dependencies to streaming module. Any additional dependencies can be separate modules

<!-- End of shaded deps. -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContextState._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener, LatestSpeedListener}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
import org.apache.spark.util.{CallSite, Utils}

Expand Down Expand Up @@ -185,6 +185,8 @@ class StreamingContext private[streaming] (

private[streaming] val progressListener = new StreamingJobProgressListener(this)

private[streaming] val speedListener = new LatestSpeedListener(this.graph.batchDuration)

private[streaming] val uiTab: Option[StreamingTab] =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(new StreamingTab(this))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ import org.apache.spark.util.{SystemClock, Utils}

/** Listener object for BlockGenerator events */
private[streaming] trait BlockGeneratorListener {
/**
* Called when a new block of data is about to be generated by the block generator. The block
* generation and this callback are synchronized with the data addition and its associated
* callback, so the data addition waits for the block generation+callback to complete. This
* is useful for mutating data before a block is generated. Any long blocking operation in this
* callback will hurt the throughput.
*/
def onBlockGeneration(currentBuffer: ArrayBuffer[Any], newBlockBuffer: ArrayBuffer[Any]): Unit =
{}

/**
* Called after a data item is added into the BlockGenerator. The data addition and this
* callback are synchronized with the block generation and its associated callback,
Expand Down Expand Up @@ -79,7 +89,7 @@ private[streaming] class BlockGenerator(
private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])

private val clock = new SystemClock()
private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
private[receiver] val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
private val blockIntervalTimer =
new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
Expand Down Expand Up @@ -146,6 +156,7 @@ private[streaming] class BlockGenerator(
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
if (newBlockBuffer.size > 0) {
listener.onBlockGeneration(newBlockBuffer, currentBuffer)
val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
val newBlock = new Block(blockId, newBlockBuffer)
listener.onGenerateBlock(blockId)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.receiver

import scala.collection.mutable.ArrayBuffer

/**
* This trait provides a strategy to deal with a large amount of data seen
* at a Receiver, possibly ensuing an exhaustion of resources.
* See SPARK-7398
* Any long blocking operation in this class will hurt the throughput.
*/
trait CongestionStrategy {

/**
* Called on every batch interval with the estimated maximum number of
* elements per block that can been processed in a batch interval,
* based on the processing speed observed over the last batch.
*/
def onBlockBoundUpdate(bound: Int): Unit

/**
* Given data buffers intended for a block, and for the following block
* mutates those buffers to an amount appropriate with respect to the
* back-pressure information provided through `onBlockBoundUpdate`.
*/
def restrictCurrentBuffer(currentBuffer: ArrayBuffer[Any], nextBuffer: ArrayBuffer[Any]): Unit

}

abstract class ThrottlingCongestionStrategy(private[receiver] val rateLimiter: RateLimiter)
extends CongestionStrategy
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.receiver

import scala.collection.mutable.ArrayBuffer
import java.util.concurrent.atomic.AtomicInteger
import java.util.Random
import org.apache.spark.Logging
import org.apache.spark.util.random.{RandomSampler, BernoulliSampler}

/**
* This class provides a congestion strategy that ignores
* any back-pressure information.
* @see CongestionStrategy
*/
class IgnoreCongestionStrategy extends CongestionStrategy {

override def onBlockBoundUpdate(bound: Int): Unit = {}

override def restrictCurrentBuffer(currentBuffer: ArrayBuffer[Any],
nextBuffer: ArrayBuffer[Any]): Unit = {}
}

class PushBackCongestionStrategy(blockGenerator: BlockGenerator)
extends ThrottlingCongestionStrategy(blockGenerator) {

private val latestBound = new AtomicInteger(-1)

override def onBlockBoundUpdate(bound: Int): Unit = {
if (bound > 0) rateLimiter.updateRate(bound * 1000 / blockGenerator.blockIntervalMs.toInt)
latestBound.set(bound)
}

override def restrictCurrentBuffer(currentBuffer: ArrayBuffer[Any],
nextBuffer: ArrayBuffer[Any]): Unit = {
val bound = latestBound.get()
val difference = currentBuffer.size - bound
if (bound > 0 && difference > 0) {
nextBuffer ++=: currentBuffer.takeRight(difference)
currentBuffer.reduceToSize(bound)
}
}

}

class DropCongestionStrategy extends CongestionStrategy with Logging {

private val latestBound = new AtomicInteger(-1)

override def onBlockBoundUpdate(bound: Int): Unit = latestBound.set(bound)

override def restrictCurrentBuffer(currentBuffer: ArrayBuffer[Any],
nextBuffer: ArrayBuffer[Any]): Unit = {
val bound = latestBound.get()
val difference = currentBuffer.size - bound
if (bound > 0 && difference > 0) {
currentBuffer.reduceToSize(bound)

val f = bound.toDouble / currentBuffer.size
logDebug(f"Prepared block by dropping with ratio of $f%2.2f.")
}
}

}

class SamplingCongestionStrategy extends CongestionStrategy with Logging {

private val rng = RandomSampler.newDefaultRNG

private val latestBound = new AtomicInteger(-1)

override def onBlockBoundUpdate(bound: Int): Unit = latestBound.set(bound)

override def restrictCurrentBuffer(currentBuffer: ArrayBuffer[Any],
nextBuffer: ArrayBuffer[Any]): Unit = {
val bound = latestBound.get()
val f = bound.toDouble / currentBuffer.size
if (f > 0 && f < 1){
val sampled = new BernoulliSampler(f, rng).sample(currentBuffer.toIterator).toArray

currentBuffer.clear()
currentBuffer ++= sampled

logDebug(f"Prepared sampled block with ratio of $f%2.2f.")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.streaming.receiver

import com.google.common.util.concurrent.{RateLimiter => GuavaRateLimiter}
import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.{Logging, SparkConf}

Expand All @@ -35,11 +36,21 @@ import org.apache.spark.{Logging, SparkConf}
private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {

private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
private lazy val rateLimiter = GuavaRateLimiter.create(desiredRate)
private var currentRate = new AtomicInteger(desiredRate)
private lazy val rateLimiter = GuavaRateLimiter.create(currentRate.get())

def waitToPush() {
if (desiredRate > 0) {
if (currentRate.get() > 0) {
rateLimiter.acquire()
}
}

private[receiver] def updateRate(newRate: Int): Unit =
if (newRate > 0) {
try {
currentRate.set(newRate)
} finally {
rateLimiter.setRate(newRate)
}
}
}
Loading