Skip to content

Commit 29c1771

Browse files
committed
Change the ⇒ character (maybe from scalariform) to => in Scala code for style consistency.
1 parent a8cf3ec commit 29c1771

File tree

5 files changed

+15
-18
lines changed

5 files changed

+15
-18
lines changed

examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ extends Actor with Receiver {
8888
override def preStart = remotePublisher ! SubscribeReceiver(context.self)
8989

9090
def receive = {
91-
case msg pushBlock(msg.asInstanceOf[T])
91+
case msg => pushBlock(msg.asInstanceOf[T])
9292
}
9393

9494
override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)

external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,11 @@
1717

1818
package org.apache.spark.streaming.twitter
1919

20-
import java.util.prefs.Preferences
2120
import twitter4j._
2221
import twitter4j.auth.Authorization
2322
import twitter4j.conf.ConfigurationBuilder
24-
import twitter4j.conf.PropertyConfiguration
2523
import twitter4j.auth.OAuthAuthorization
26-
import twitter4j.auth.AccessToken
27-
import org.apache.spark._
24+
2825
import org.apache.spark.streaming._
2926
import org.apache.spark.streaming.dstream._
3027
import org.apache.spark.storage.StorageLevel

external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,24 +31,24 @@ import org.apache.spark.streaming.receivers._
3131
*/
3232
private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
3333
subscribe: Subscribe,
34-
bytesToObjects: Seq[ByteString] Iterator[T])
34+
bytesToObjects: Seq[ByteString] => Iterator[T])
3535
extends Actor with Receiver with Logging {
3636

3737
override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
3838
Connect(publisherUrl), subscribe)
3939

4040
def receive: Receive = {
4141

42-
case Connecting logInfo("connecting ...")
42+
case Connecting => logInfo("connecting ...")
4343

44-
case m: ZMQMessage
44+
case m: ZMQMessage =>
4545
logDebug("Received message for:" + m.frame(0))
4646

4747
//We ignore first frame for processing as it is the topic
4848
val bytes = m.frames.tail
4949
pushBlock(bytesToObjects(bytes))
5050

51-
case Closed logInfo("received closed ")
51+
case Closed => logInfo("received closed ")
5252

5353
}
5454
}

external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ object ZeroMQUtils {
4646
ssc: StreamingContext,
4747
publisherUrl: String,
4848
subscribe: Subscribe,
49-
bytesToObjects: Seq[ByteString] Iterator[T],
49+
bytesToObjects: Seq[ByteString] => Iterator[T],
5050
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
5151
supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
5252
): DStream[T] = {

streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ object ReceiverSupervisorStrategy {
3737

3838
val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
3939
15 millis) {
40-
case _: RuntimeException Restart
41-
case _: Exception Escalate
40+
case _: RuntimeException => Restart
41+
case _: Exception => Escalate
4242
}
4343
}
4444

@@ -66,7 +66,7 @@ object ReceiverSupervisorStrategy {
6666
*/
6767
trait Receiver {
6868

69-
self: Actor // to ensure that this can be added to Actor classes only
69+
self: Actor => // to ensure that this can be added to Actor classes only
7070

7171
/**
7272
* Push an iterator received data into Spark Streaming for processing
@@ -139,25 +139,25 @@ private[streaming] class ActorReceiver[T: ClassTag](
139139

140140
def receive = {
141141

142-
case Data(iter: Iterator[_]) pushBlock(iter.asInstanceOf[Iterator[T]])
142+
case Data(iter: Iterator[_]) => pushBlock(iter.asInstanceOf[Iterator[T]])
143143

144-
case Data(msg)
144+
case Data(msg) =>
145145
blocksGenerator += msg.asInstanceOf[T]
146146
n.incrementAndGet
147147

148-
case props: Props
148+
case props: Props =>
149149
val worker = context.actorOf(props)
150150
logInfo("Started receiver worker at:" + worker.path)
151151
sender ! worker
152152

153-
case (props: Props, name: String)
153+
case (props: Props, name: String) =>
154154
val worker = context.actorOf(props, name)
155155
logInfo("Started receiver worker at:" + worker.path)
156156
sender ! worker
157157

158158
case _: PossiblyHarmful => hiccups.incrementAndGet()
159159

160-
case _: Statistics
160+
case _: Statistics =>
161161
val workers = context.children
162162
sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n"))
163163

0 commit comments

Comments
 (0)