@@ -59,72 +59,60 @@ class AkkaRpcEnvSuite extends RpcEnvSuite {
5959 }
6060 }
6161
62- test(" Future failure with RpcTimeout" ) {
62+ test(" timeout on ask Future with RpcTimeout" ) {
6363
64- class EchoActor extends Actor {
64+ class EchoActor ( sleepDuration : Long ) extends Actor {
6565 def receive : Receive = {
6666 case msg =>
67- Thread .sleep(500 )
67+ Thread .sleep(sleepDuration )
6868 sender() ! msg
6969 }
7070 }
7171
7272 val system = ActorSystem (" EchoSystem" )
73- val echoActor = system.actorOf(Props (new EchoActor ), name = " echoA" )
73+ val echoActor = system.actorOf(Props (new EchoActor (0 )), name = " echo" )
74+ val sleepyActor = system.actorOf(Props (new EchoActor (50 )), name = " sleepy" )
7475
75- val timeout = new RpcTimeout (50 millis, " spark.rpc.short.timeout" )
76+ val shortProp = " spark.rpc.short.timeout"
77+ val timeout = new RpcTimeout (10 millis, shortProp)
7678
77- val fut = echoActor.ask(" hello" )(1000 millis).mapTo[String ].recover {
78- case te : TimeoutException => throw timeout.amend(te)
79- }
79+ try {
8080
81- fut.onFailure {
82- case te : TimeoutException => println( " failed with timeout exception " )
83- }
81+ // Ask with immediate response
82+ var fut = echoActor.ask( " hello " )(timeout.duration).mapTo[ String ].
83+ recover(timeout.addMessageIfTimeout)
8484
85- fut.onComplete {
86- case Success (str) => println(" future success" )
87- case Failure (ex) => println(" future failure" )
88- }
85+ // This should complete successfully
86+ val result = timeout.awaitResult(fut)
8987
90- println(" sleeping" )
91- Thread .sleep(50 )
92- println(" Future complete: " + fut.isCompleted.toString() + " , " + fut.value.toString())
88+ assert(result.nonEmpty)
9389
94- println(" Caught TimeoutException: " +
95- intercept[TimeoutException ] {
96- // timeout.awaitResult(fut) // prints RpcTimeout description twice
97- Await .result(fut, 10 millis)
98- }.getMessage()
99- )
90+ // Ask with delayed response
91+ fut = sleepyActor.ask(" goodbye" )(timeout.duration).mapTo[String ].
92+ recover(timeout.addMessageIfTimeout)
10093
101- /*
102- val ref = env.setupEndpoint("test_future", new RpcEndpoint {
103- override val rpcEnv = env
94+ // Allow future to complete with failure using plain Await.result, this will return
95+ // once the future is complete
96+ val msg1 =
97+ intercept[RpcTimeoutException ] {
98+ Await .result(fut, 200 millis)
99+ }.getMessage()
104100
105- override def receive = {
106- case _ =>
107- }
108- })
109- val conf = new SparkConf()
110- val newRpcEnv = new AkkaRpcEnvFactory().create(
111- RpcEnvConfig(conf, "test", "localhost", 12346, new SecurityManager(conf)))
112- try {
113- val newRef = newRpcEnv.setupEndpointRef("local", ref.address, "test_future")
114- val akkaActorRef = newRef.asInstanceOf[AkkaRpcEndpointRef].actorRef
101+ assert(msg1.contains(shortProp))
115102
116- val timeout = new RpcTimeout(1 millis, "spark.rpc.short.timeout")
117- val fut = akkaActorRef.ask("hello")(timeout.duration).mapTo[String]
103+ // Use RpcTimeout.awaitResult to process Future, since it has already failed with
104+ // RpcTimeoutException, the same exception should be thrown
105+ val msg2 =
106+ intercept[RpcTimeoutException ] {
107+ timeout.awaitResult(fut)
108+ }.getMessage()
118109
119- Thread.sleep(500)
120- println("Future complete: " + fut.isCompleted.toString() + ", " + fut.value.toString() )
110+ // Ensure description is not in message twice after addMessageIfTimeout and awaitResult
111+ assert(shortProp.r.findAllIn(msg2).length === 1 )
121112
122113 } finally {
123- newRpcEnv .shutdown()
114+ system .shutdown()
124115 }
125- */
126-
127-
128116 }
129117
130118}
0 commit comments