@@ -155,16 +155,23 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
155155    })
156156
157157    val  conf  =  new  SparkConf ()
158+     val  shortProp  =  " spark.rpc.short.timeout" 
158159    conf.set(" spark.rpc.retry.wait" " 0" 
159160    conf.set(" spark.rpc.numRetries" " 1" 
160161    val  anotherEnv  =  createRpcEnv(conf, " remote" 13345 )
161162    //  Use anotherEnv to find out the RpcEndpointRef
162163    val  rpcEndpointRef  =  anotherEnv.setupEndpointRef(" local" " ask-timeout" 
163164    try  {
164165      val  e  =  intercept[Exception ] {
165-         rpcEndpointRef.askWithRetry[String ](" hello" 1  millis)
166+         rpcEndpointRef.askWithRetry[String ](" hello" new   RpcTimeout ( 1  millis, shortProp) )
166167      }
167168      assert(e.isInstanceOf [TimeoutException ] ||  e.getCause.isInstanceOf [TimeoutException ])
169+       e match  {
170+         case  te : TimeoutException  => 
171+           assert(te.getMessage().contains(shortProp))
172+         case  e : Exception  => 
173+           assert(e.getCause().getMessage().contains(shortProp))
174+       }
168175    } finally  {
169176      anotherEnv.shutdown()
170177      anotherEnv.awaitTermination()
@@ -539,6 +546,22 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
539546    }
540547  }
541548
549+   test(" construction of RpcTimeout using properties" 
550+     val  conf  =  new  SparkConf 
551+ 
552+     val  testProp  =  " spark.ask.test.timeout" 
553+     val  testDurationSeconds  =  30 
554+ 
555+     conf.set(testProp, testDurationSeconds.toString +  " s" 
556+ 
557+     val  rt  =  RpcTimeout (conf, testProp)
558+     assert( testDurationSeconds ===  rt.duration.toSeconds )
559+ 
560+     val  ex  =  intercept[Throwable ] {
561+       RpcTimeout (conf, " spark.ask.invalid.timeout" 
562+     }
563+   }
564+ 
542565}
543566
544567class  UnserializableClass 
0 commit comments