Skip to content

Commit 4c3c566

Browse files
author
Andrew Or
committed
Merge branch 'branch-1.4' of github.com:apache/spark into demarcate-tests-1.4
Conflicts: core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
2 parents e217b78 + d0be950 commit 4c3c566

File tree

74 files changed

+1213
-368
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+1213
-368
lines changed

.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,4 @@ local-1426633911242/*
8282
local-1430917381534/*
8383
DESCRIPTION
8484
NAMESPACE
85+
test_support/*

R/pkg/inst/profile/shell.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
old <- getOption("defaultPackages")
2525
options(defaultPackages = c(old, "SparkR"))
2626

27-
sc <- SparkR::sparkR.init(Sys.getenv("MASTER", unset = ""))
27+
sc <- SparkR::sparkR.init()
2828
assign("sc", sc, envir=.GlobalEnv)
2929
sqlContext <- SparkR::sparkRSQL.init(sc)
3030
assign("sqlContext", sqlContext, envir=.GlobalEnv)

core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver")
6868
retryHeaders, retryRow, Iterable.apply(driverState.description.retryState))
6969
val content =
7070
<p>Driver state information for driver id {driverId}</p>
71-
<a href="/">Back to Drivers</a>
71+
<a href={UIUtils.prependBaseUri("/")}>Back to Drivers</a>
7272
<div class="row-fluid">
7373
<div class="span12">
7474
<h4>Driver state: {driverState.state}</h4>

core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.deploy.worker.ui
1919

20+
import java.io.File
21+
import java.net.URI
2022
import javax.servlet.http.HttpServletRequest
2123

2224
import scala.xml.Node
@@ -135,6 +137,13 @@ private[ui] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with
135137
return ("Error: Log type must be one of " + supportedLogTypes.mkString(", "), 0, 0, 0)
136138
}
137139

140+
// Verify that the normalized path of the log directory is in the working directory
141+
val normalizedUri = new URI(logDirectory).normalize()
142+
val normalizedLogDir = new File(normalizedUri.getPath)
143+
if (!Utils.isInDirectory(workDir, normalizedLogDir)) {
144+
return ("Error: invalid log directory " + logDirectory, 0, 0, 0)
145+
}
146+
138147
try {
139148
val files = RollingFileAppender.getSortedRolledOverFiles(logDirectory, logType)
140149
logDebug(s"Sorted log files of type $logType in $logDirectory:\n${files.mkString("\n")}")

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,12 @@
1717

1818
package org.apache.spark.ui.jobs
1919

20+
import java.util.concurrent.TimeoutException
21+
2022
import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
2123

24+
import com.google.common.annotations.VisibleForTesting
25+
2226
import org.apache.spark._
2327
import org.apache.spark.annotation.DeveloperApi
2428
import org.apache.spark.executor.TaskMetrics
@@ -526,4 +530,30 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
526530
override def onApplicationStart(appStarted: SparkListenerApplicationStart) {
527531
startTime = appStarted.time
528532
}
533+
534+
/**
535+
* For testing only. Wait until at least `numExecutors` executors are up, or throw
536+
* `TimeoutException` if the waiting time elapsed before `numExecutors` executors up.
537+
*
538+
* @param numExecutors the number of executors to wait at least
539+
* @param timeout time to wait in milliseconds
540+
*/
541+
@VisibleForTesting
542+
private[spark] def waitUntilExecutorsUp(numExecutors: Int, timeout: Long): Unit = {
543+
val finishTime = System.currentTimeMillis() + timeout
544+
while (System.currentTimeMillis() < finishTime) {
545+
val numBlockManagers = synchronized {
546+
blockManagerIds.size
547+
}
548+
if (numBlockManagers >= numExecutors + 1) {
549+
// Need to count the block manager in driver
550+
return
551+
}
552+
// Sleep rather than using wait/notify, because this is used only for testing and wait/notify
553+
// add overhead in the general case.
554+
Thread.sleep(10)
555+
}
556+
throw new TimeoutException(
557+
s"Can't find $numExecutors executors before $timeout milliseconds elapsed")
558+
}
529559
}

core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,21 +120,22 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
120120

121121
/**
122122
* For testing only. Wait until there are no more events in the queue, or until the specified
123-
* time has elapsed. Return true if the queue has emptied and false is the specified time
124-
* elapsed before the queue emptied.
123+
* time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue
124+
* emptied.
125125
*/
126126
@VisibleForTesting
127-
def waitUntilEmpty(timeoutMillis: Int): Boolean = {
127+
@throws(classOf[TimeoutException])
128+
def waitUntilEmpty(timeoutMillis: Long): Unit = {
128129
val finishTime = System.currentTimeMillis + timeoutMillis
129130
while (!queueIsEmpty) {
130131
if (System.currentTimeMillis > finishTime) {
131-
return false
132+
throw new TimeoutException(
133+
s"The event queue is not empty after $timeoutMillis milliseconds")
132134
}
133135
/* Sleep rather than using wait/notify, because this is used only for testing and
134136
* wait/notify add overhead in the general case. */
135137
Thread.sleep(10)
136138
}
137-
true
138139
}
139140

140141
/**

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2227,6 +2227,22 @@ private[spark] object Utils extends Logging {
22272227
}
22282228
}
22292229

2230+
/**
2231+
* Return whether the specified file is a parent directory of the child file.
2232+
*/
2233+
def isInDirectory(parent: File, child: File): Boolean = {
2234+
if (child == null || parent == null) {
2235+
return false
2236+
}
2237+
if (!child.exists() || !parent.exists() || !parent.isDirectory()) {
2238+
return false
2239+
}
2240+
if (parent.equals(child)) {
2241+
return true
2242+
}
2243+
isInDirectory(parent, child.getParentFile)
2244+
}
2245+
22302246
}
22312247

22322248
private [util] class SparkShutdownHookManager {

core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,14 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
5555
sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
5656
sc.env.blockManager.shuffleClient.getClass should equal(classOf[ExternalShuffleClient])
5757

58+
// In a slow machine, one slave may register hundreds of milliseconds ahead of the other one.
59+
// If we don't wait for all salves, it's possible that only one executor runs all jobs. Then
60+
// all shuffle blocks will be in this executor, ShuffleBlockFetcherIterator will directly fetch
61+
// local blocks from the local BlockManager and won't send requests to ExternalShuffleService.
62+
// In this case, we won't receive FetchFailed. And it will make this test fail.
63+
// Therefore, we should wait until all salves are up
64+
sc.jobProgressListener.waitUntilExecutorsUp(2, 10000)
65+
5866
val rdd = sc.parallelize(0 until 1000, 10).map(i => (i, 1)).reduceByKey(_ + _)
5967

6068
rdd.count()

core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.broadcast
1919

20-
import scala.concurrent.duration._
2120
import scala.util.Random
2221

2322
import org.scalatest.Assertions
@@ -312,13 +311,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
312311
val _sc =
313312
new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", broadcastConf)
314313
// Wait until all salves are up
315-
eventually(timeout(10.seconds), interval(10.milliseconds)) {
316-
_sc.jobProgressListener.synchronized {
317-
val numBlockManagers = _sc.jobProgressListener.blockManagerIds.size
318-
assert(numBlockManagers == numSlaves + 1,
319-
s"Expect ${numSlaves + 1} block managers, but was ${numBlockManagers}")
320-
}
321-
}
314+
_sc.jobProgressListener.waitUntilExecutorsUp(numSlaves, 10000)
322315
_sc
323316
} else {
324317
new SparkContext("local", "test", broadcastConf)

core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
4141
// Trigger a job so that executors get added
4242
sc.parallelize(1 to 100, 4).map(_.toString).count()
4343

44-
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
44+
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
4545
listener.addedExecutorInfos.values.foreach { info =>
4646
assert(info.logUrlMap.nonEmpty)
4747
// Browse to each URL to check that it's valid
@@ -71,7 +71,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
7171
// Trigger a job so that executors get added
7272
sc.parallelize(1 to 100, 4).map(_.toString).count()
7373

74-
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
74+
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
7575
val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
7676
assert(listeners.size === 1)
7777
val listener = listeners(0)

0 commit comments

Comments
 (0)