Skip to content

Commit b1de461

Browse files
ilayaperumalgtdas
authored andcommitted
[SPARK-4802] [streaming] Remove receiverInfo once receiver is de-registered
Once the streaming receiver is de-registered at executor, the `ReceiverTrackerActor` needs to remove the corresponding reveiverInfo from the `receiverInfo` map at `ReceiverTracker`. Author: Ilayaperumal Gopinathan <[email protected]> Closes #3647 from ilayaperumalg/receiverInfo-RTracker and squashes the following commits: 6eb97d5 [Ilayaperumal Gopinathan] Polishing based on the review 3640c86 [Ilayaperumal Gopinathan] Remove receiverInfo once receiver is de-registered (cherry picked from commit 10d69e9) Signed-off-by: Tathagata Das <[email protected]> Conflicts: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
1 parent 3bce43f commit b1de461

File tree

1 file changed

+2
-2
lines changed

1 file changed

+2
-2
lines changed

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,8 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
139139
logWarning("No prior receiver info")
140140
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
141141
}
142-
receiverInfo(streamId) = newReceiverInfo
143-
ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(receiverInfo(streamId)))
142+
receiverInfo -= streamId
143+
ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))
144144
val messageWithError = if (error != null && !error.isEmpty) {
145145
s"$message - $error"
146146
} else {

0 commit comments

Comments
 (0)