Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit e38edfb

Browse files
committed
Adds ReactiveReceiver abstract class
1 parent 5b39d95 commit e38edfb

File tree

4 files changed

+245
-0
lines changed

4 files changed

+245
-0
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@
162162
<fasterxml.jackson.version>2.4.4</fasterxml.jackson.version>
163163
<snappy.version>1.1.1.7</snappy.version>
164164
<netlib.java.version>1.1.2</netlib.java.version>
165+
<reactivestreams.version>1.0.0</reactivestreams.version>
165166

166167
<test.java.home>${java.home}</test.java.home>
167168

@@ -315,6 +316,11 @@
315316
</dependencies>
316317
<dependencyManagement>
317318
<dependencies>
319+
<dependency>
320+
<groupId>org.reactivestreams</groupId>
321+
<artifactId>reactive-streams</artifactId>
322+
<version>${reactivestreams.version}</version>
323+
</dependency>
318324
<dependency>
319325
<groupId>${jline.groupid}</groupId>
320326
<artifactId>jline</artifactId>

streaming/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@
6666
<groupId>org.eclipse.jetty</groupId>
6767
<artifactId>jetty-servlet</artifactId>
6868
</dependency>
69+
<dependency>
70+
<groupId>org.reactivestreams</groupId>
71+
<artifactId>reactive-streams</artifactId>
72+
</dependency>
6973
<!-- End of shaded deps. -->
7074

7175
<dependency>
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.receiver
19+
20+
import org.reactivestreams._
21+
import org.apache.spark.storage.StorageLevel
22+
import org.apache.spark.annotation.DeveloperApi
23+
import scala.collection.mutable.ArrayBuffer
24+
import scala.util.{Try, Success, Failure}
25+
import java.lang.IllegalStateException
26+
27+
/**
28+
* An abstract class thet helps in implementing a Receiver compliant with the
29+
* Reactive Streams API (http://www.reactive-streams.org/)
30+
*
31+
*
32+
* Classes deriving from this API must be used with the 'reactive'
33+
* congestion strategy setting. They can then communicate back-pressure through
34+
* the reactiveCongestionStrategy below, and receive new elements through onNext.
35+
*
36+
* Implementation checklist:
37+
*
38+
* - Implementations should acquire a subscription in onStart by calling
39+
* Publisher#subscribe(this) on a reactive Publisher.
40+
* - Implementations should override whenNext to configure custom behavior
41+
* upon receiving a new data element. They should call store in that method.
42+
* - Implementations should override whenError to configure custom behavior
43+
* upon failure. They should call on Stop in that method.
44+
* - Implementations should override whenComplete to configure custom behavior
45+
* upon completion. They should call onStop in that method.
46+
*
47+
* @see [[receiver.congestionStrategy]]
48+
* @see [[org.reactivestreams.Subscriber]]
49+
*/
50+
@DeveloperApi
51+
abstract class ReactiveReceiver[T](storageLevel: StorageLevel)
52+
extends Receiver[T](storageLevel) with Subscriber[T] { outer =>
53+
private var subscription: Subscription = null
54+
private var done: Boolean = false
55+
56+
private sealed trait SpecificationViolation {
57+
val message: String
58+
def printStackTrace(t: Option[Throwable] = None): Unit = t match {
59+
case Some(e) => new IllegalStateException(message, e).printStackTrace(System.err)
60+
case None => new IllegalStateException(message).printStackTrace(System.err)
61+
}
62+
}
63+
64+
private case class CancelException(s: Subscription) extends SpecificationViolation {
65+
val message = s"""$s violated the Reactive Streams rule 3.15
66+
| by throwing an exception from cancel.""".stripMargin.replaceAll("\n", "")
67+
}
68+
69+
private case class RequestException(s: Subscription) extends SpecificationViolation {
70+
val message = s"""$s violated the Reactive Streams rule 3.16
71+
|by throwing an exception from request.""".stripMargin.replaceAll("\n", "")
72+
}
73+
74+
private case class BeforeSubscribe() extends SpecificationViolation {
75+
val message = """Publisher violated the Reactive Streams rule 1.09 by signalling onNext,
76+
| onComplete or onError prior to onSubscribe.
77+
|""".stripMargin.replaceAll("\n", "")
78+
}
79+
80+
private case class ErrorException(c: outer.type) extends SpecificationViolation {
81+
val message = s"""$c violates the Reactive Streams rule 2.13 by throwing an
82+
| exception from onError.""".stripMargin.replaceAll("\n", "")
83+
}
84+
85+
private def finish(): Unit = {
86+
done = true
87+
if (subscription != null) Try(subscription.cancel()) recover {
88+
case t: Throwable => CancelException(subscription).printStackTrace(Some(t))
89+
}
90+
}
91+
92+
/**
93+
* Invoked by the Publisher after calling Publisher#subscribe(this).
94+
* No data will start flowing until subscription.request(Int) is invoked
95+
*p
96+
* @param s the subscription that allows requesting data via subscription.request(Int)
97+
*/
98+
final override def onSubscribe(s: Subscription): Unit = {
99+
for { sub <- Option(s) }
100+
{
101+
if (subscription != null) {
102+
Try(subscription.cancel()) recover {
103+
case t: Throwable => CancelException(subscription).printStackTrace(Some(t))
104+
}
105+
} else {
106+
subscription = s
107+
}
108+
}
109+
}
110+
111+
/**
112+
* Data notification sent by the Publisher, in reponse to requests to
113+
* subscription.request(Int) made by the reactiveCongestionStrategy.
114+
*
115+
* Performs spec compliance checks and defers to whenNext for override.
116+
*
117+
* @see whenNext
118+
* @param element The element signaled
119+
*/
120+
final override def onNext(element: T): Unit = {
121+
if (!done){
122+
if (subscription == null) {
123+
BeforeSubscribe().printStackTrace()
124+
} else {
125+
Try(whenNext(element)) match {
126+
case Success(continue) => if (!continue) finish()
127+
case Failure(f) => Try(onError(f)) recover {
128+
case t: Throwable => ErrorException(this).printStackTrace(Some(t))
129+
}
130+
}
131+
}
132+
}
133+
}
134+
135+
/**
136+
* Failed terminal state. No further events will be sent even if
137+
* subscription.request(Int) is called again
138+
*
139+
* Performs spec compliance checks and defers to whenError for override.
140+
*
141+
* @see whenError
142+
* @param error The throwable signaled
143+
*/
144+
final override def onError(error: Throwable): Unit = {
145+
if (subscription == null) {
146+
BeforeSubscribe().printStackTrace()
147+
} else {
148+
done = true
149+
whenError(error)
150+
}
151+
}
152+
153+
/**
154+
* Successful terminal state. No further events will be sent even if
155+
* subscription.request(Int) is called again
156+
*
157+
* Performs spec compliance checks and defers to whenComplete for override.
158+
*
159+
* @see whenComplete
160+
*/
161+
final override def onComplete(): Unit = {
162+
if (subscription == null) {
163+
BeforeSubscribe().printStackTrace()
164+
} else {
165+
done = true
166+
whenComplete()
167+
}
168+
}
169+
170+
/**
171+
* Data notification sent by the Publisher, in response to requests to
172+
* subscription.request() made by the reactiveCongestionStrategy.
173+
*
174+
* This is not intended to be called directly, onNext will be called instead.
175+
*
176+
* @see onNext
177+
* @param element The element signaled
178+
*/
179+
protected def whenNext(element: T): Boolean = {
180+
store(element)
181+
true
182+
}
183+
184+
/**
185+
* Successful terminal state. No further events will be sent even if
186+
* subscription.request() is called again
187+
*
188+
* This is not intended to be called directly, onComplete will be called instead.
189+
*
190+
* @see onComplete
191+
*/
192+
protected def whenComplete(): Unit = {
193+
onStop()
194+
}
195+
196+
/**
197+
* Failed terminal state. No further events will be sent even if
198+
* subscription.request() is called again
199+
*
200+
* This is not intended to be called directly, onError will be called instead.
201+
*
202+
* @see onError
203+
* @param error The throwable signaled
204+
*/
205+
protected def whenError(error: Throwable): Unit = {
206+
onStop()
207+
}
208+
209+
210+
/**
211+
* A congestion Strategy which transmits Spark's back-pressure signal to the
212+
* Publisher, through the Reactive Streams API.
213+
*
214+
* @see [[receiver.CongestionStrategy]]
215+
*/
216+
val reactiveCongestionStrategy = new CongestionStrategy {
217+
218+
override def onBlockBoundUpdate(bound: Int) = if (subscription != null) {
219+
Try(subscription.request(bound)) recover {
220+
case t: Throwable => RequestException(subscription).printStackTrace(Some(t))
221+
}
222+
}
223+
224+
override def restrictCurrentBuffer(currentBuffer: ArrayBuffer[Any],
225+
nextBuffer: ArrayBuffer[Any]): Unit = {}
226+
}
227+
228+
229+
}

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,12 @@ private[streaming] class ReceiverSupervisorImpl(
114114
case "pushback" => new PushBackCongestionStrategy(blockGenerator)
115115
case "drop" => new DropCongestionStrategy()
116116
case "sampling" => new SamplingCongestionStrategy()
117+
case "reactive" => receiver match {
118+
case r: ReactiveReceiver[_] => r.reactiveCongestionStrategy
119+
case _ => throw new SparkException(
120+
"Cannot enable reactive congestion strategy on a Receiver that does not extend" +
121+
" ReactiveReceiver. See documentation for more details.")
122+
}
117123
case _ => new IgnoreCongestionStrategy()
118124
}.getOrElse {
119125
new IgnoreCongestionStrategy()

0 commit comments

Comments
 (0)