Skip to content

Commit 61e5a87

Browse files
author
liguoqiang
committed
Merge branch 'master' into SPARK-1149
2 parents e68210a + 3a8b698 commit 61e5a87

File tree

13 files changed

+206
-88
lines changed

13 files changed

+206
-88
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import org.apache.hadoop.security.UserGroupInformation
2525

2626
import org.apache.spark.{SparkContext, SparkException}
2727

28+
import scala.collection.JavaConversions._
29+
2830
/**
2931
* Contains util methods to interact with Hadoop from Spark.
3032
*/
@@ -33,15 +35,9 @@ class SparkHadoopUtil {
3335
UserGroupInformation.setConfiguration(conf)
3436

3537
def runAsUser(user: String)(func: () => Unit) {
36-
// if we are already running as the user intended there is no reason to do the doAs. It
37-
// will actually break secure HDFS access as it doesn't fill in the credentials. Also if
38-
// the user is UNKNOWN then we shouldn't be creating a remote unknown user
39-
// (this is actually the path spark on yarn takes) since SPARK_USER is initialized only
40-
// in SparkContext.
41-
val currentUser = Option(System.getProperty("user.name")).
42-
getOrElse(SparkContext.SPARK_UNKNOWN_USER)
43-
if (user != SparkContext.SPARK_UNKNOWN_USER && currentUser != user) {
38+
if (user != SparkContext.SPARK_UNKNOWN_USER) {
4439
val ugi = UserGroupInformation.createRemoteUser(user)
40+
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
4541
ugi.doAs(new PrivilegedExceptionAction[Unit] {
4642
def run: Unit = func()
4743
})
@@ -50,6 +46,12 @@ class SparkHadoopUtil {
5046
}
5147
}
5248

49+
def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
50+
for (token <- source.getTokens()) {
51+
dest.addToken(token)
52+
}
53+
}
54+
5355
/**
5456
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
5557
* subsystems.

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,15 @@ import scala.reflect.ClassTag
3030

3131
import com.clearspring.analytics.stream.cardinality.HyperLogLog
3232
import org.apache.hadoop.conf.{Configurable, Configuration}
33-
import org.apache.hadoop.fs.Path
33+
import org.apache.hadoop.fs.{FileSystem, Path}
3434
import org.apache.hadoop.io.SequenceFile.CompressionType
3535
import org.apache.hadoop.io.compress.CompressionCodec
3636
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
37-
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
38-
import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
39-
import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter}
37+
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, JobContext, SparkHadoopMapReduceUtil}
4038
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
4139

4240
// SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark.
4341
import org.apache.hadoop.mapred.SparkHadoopWriter
44-
import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil
4542

4643
import org.apache.spark._
4744
import org.apache.spark.Partitioner.defaultPartitioner
@@ -604,8 +601,12 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
604601
val job = new NewAPIHadoopJob(conf)
605602
job.setOutputKeyClass(keyClass)
606603
job.setOutputValueClass(valueClass)
604+
607605
val wrappedConf = new SerializableWritable(job.getConfiguration)
608-
NewFileOutputFormat.setOutputPath(job, new Path(path))
606+
val outpath = new Path(path)
607+
NewFileOutputFormat.setOutputPath(job, outpath)
608+
val jobFormat = outputFormatClass.newInstance
609+
jobFormat.checkOutputSpecs(job)
609610
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
610611
val jobtrackerID = formatter.format(new Date())
611612
val stageId = self.id
@@ -633,7 +634,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
633634
committer.commitTask(hadoopContext)
634635
return 1
635636
}
636-
val jobFormat = outputFormatClass.newInstance
637+
637638
/* apparently we need a TaskAttemptID to construct an OutputCommitter;
638639
* however we're only going to use this local OutputCommitter for
639640
* setupJob/commitJob, so we just use a dummy "map" task.
@@ -642,7 +643,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
642643
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
643644
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
644645
jobCommitter.setupJob(jobTaskContext)
645-
val count = self.context.runJob(self, writeShard _).sum
646+
self.context.runJob(self, writeShard _)
646647
jobCommitter.commitJob(jobTaskContext)
647648
}
648649

@@ -696,10 +697,10 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
696697
* MapReduce job.
697698
*/
698699
def saveAsHadoopDataset(conf: JobConf) {
699-
val outputFormatClass = conf.getOutputFormat
700+
val outputFormatInstance = conf.getOutputFormat
700701
val keyClass = conf.getOutputKeyClass
701702
val valueClass = conf.getOutputValueClass
702-
if (outputFormatClass == null) {
703+
if (outputFormatInstance == null) {
703704
throw new SparkException("Output format class not set")
704705
}
705706
if (keyClass == null) {
@@ -712,6 +713,12 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
712713
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
713714
valueClass.getSimpleName + ")")
714715

716+
if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
717+
// FileOutputFormat ignores the filesystem parameter
718+
val ignoredFs = FileSystem.get(conf)
719+
conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)
720+
}
721+
715722
val writer = new SparkHadoopWriter(conf)
716723
writer.preSetup()
717724

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import scala.concurrent.duration._
2525
import scala.collection.mutable.ArrayBuffer
2626
import scala.collection.mutable.HashMap
2727
import scala.collection.mutable.HashSet
28+
import scala.util.Random
2829

2930
import org.apache.spark._
3031
import org.apache.spark.TaskState.TaskState
@@ -207,9 +208,11 @@ private[spark] class TaskSchedulerImpl(
207208
}
208209
}
209210

210-
// Build a list of tasks to assign to each worker
211-
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
212-
val availableCpus = offers.map(o => o.cores).toArray
211+
// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
212+
val shuffledOffers = Random.shuffle(offers)
213+
// Build a list of tasks to assign to each worker.
214+
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
215+
val availableCpus = shuffledOffers.map(o => o.cores).toArray
213216
val sortedTaskSets = rootPool.getSortedTaskSetQueue()
214217
for (taskSet <- sortedTaskSets) {
215218
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
@@ -222,9 +225,9 @@ private[spark] class TaskSchedulerImpl(
222225
for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
223226
do {
224227
launchedTask = false
225-
for (i <- 0 until offers.size) {
226-
val execId = offers(i).executorId
227-
val host = offers(i).host
228+
for (i <- 0 until shuffledOffers.size) {
229+
val execId = shuffledOffers(i).executorId
230+
val host = shuffledOffers(i).host
228231
for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
229232
tasks(i) += task
230233
val tid = task.taskId

core/src/test/scala/org/apache/spark/FileSuite.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ import scala.io.Source
2424
import com.google.common.io.Files
2525
import org.apache.hadoop.io._
2626
import org.apache.hadoop.io.compress.DefaultCodec
27+
import org.apache.hadoop.mapred.FileAlreadyExistsException
2728
import org.scalatest.FunSuite
2829

2930
import org.apache.spark.SparkContext._
31+
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
3032

3133
class FileSuite extends FunSuite with LocalSparkContext {
3234

@@ -208,4 +210,44 @@ class FileSuite extends FunSuite with LocalSparkContext {
208210
assert(rdd.count() === 3)
209211
assert(rdd.count() === 3)
210212
}
213+
214+
test ("prevent user from overwriting the empty directory (old Hadoop API)") {
215+
sc = new SparkContext("local", "test")
216+
val tempdir = Files.createTempDir()
217+
val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
218+
intercept[FileAlreadyExistsException] {
219+
randomRDD.saveAsTextFile(tempdir.getPath)
220+
}
221+
}
222+
223+
test ("prevent user from overwriting the non-empty directory (old Hadoop API)") {
224+
sc = new SparkContext("local", "test")
225+
val tempdir = Files.createTempDir()
226+
val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
227+
randomRDD.saveAsTextFile(tempdir.getPath + "/output")
228+
assert(new File(tempdir.getPath + "/output/part-00000").exists() === true)
229+
intercept[FileAlreadyExistsException] {
230+
randomRDD.saveAsTextFile(tempdir.getPath + "/output")
231+
}
232+
}
233+
234+
test ("prevent user from overwriting the empty directory (new Hadoop API)") {
235+
sc = new SparkContext("local", "test")
236+
val tempdir = Files.createTempDir()
237+
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
238+
intercept[FileAlreadyExistsException] {
239+
randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath)
240+
}
241+
}
242+
243+
test ("prevent user from overwriting the non-empty directory (new Hadoop API)") {
244+
sc = new SparkContext("local", "test")
245+
val tempdir = Files.createTempDir()
246+
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
247+
randomRDD.saveAsTextFile(tempdir.getPath + "/output")
248+
assert(new File(tempdir.getPath + "/output/part-00000").exists() === true)
249+
intercept[FileAlreadyExistsException] {
250+
randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath)
251+
}
252+
}
211253
}

core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,19 @@ class FakeTask(stageId: Int, prefLocs: Seq[TaskLocation] = Nil) extends Task[Int
2424

2525
override def preferredLocations: Seq[TaskLocation] = prefLocs
2626
}
27+
28+
object FakeTask {
29+
/**
30+
* Utility method to create a TaskSet, potentially setting a particular sequence of preferred
31+
* locations for each task (given as varargs) if this sequence is not empty.
32+
*/
33+
def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = {
34+
if (prefLocs.size != 0 && prefLocs.size != numTasks) {
35+
throw new IllegalArgumentException("Wrong number of task locations")
36+
}
37+
val tasks = Array.tabulate[Task[_]](numTasks) { i =>
38+
new FakeTask(i, if (prefLocs.size != 0) prefLocs(i) else Nil)
39+
}
40+
new TaskSet(tasks, 0, 0, 0, null)
41+
}
42+
}

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,13 @@ import org.scalatest.FunSuite
2525

2626
import org.apache.spark._
2727

28+
class FakeSchedulerBackend extends SchedulerBackend {
29+
def start() {}
30+
def stop() {}
31+
def reviveOffers() {}
32+
def defaultParallelism() = 1
33+
}
34+
2835
class FakeTaskSetManager(
2936
initPriority: Int,
3037
initStageId: Int,
@@ -107,7 +114,8 @@ class FakeTaskSetManager(
107114

108115
class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Logging {
109116

110-
def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskSchedulerImpl, taskSet: TaskSet): FakeTaskSetManager = {
117+
def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskSchedulerImpl,
118+
taskSet: TaskSet): FakeTaskSetManager = {
111119
new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
112120
}
113121

@@ -135,10 +143,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
135143
test("FIFO Scheduler Test") {
136144
sc = new SparkContext("local", "TaskSchedulerImplSuite")
137145
val taskScheduler = new TaskSchedulerImpl(sc)
138-
var tasks = ArrayBuffer[Task[_]]()
139-
val task = new FakeTask(0)
140-
tasks += task
141-
val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
146+
val taskSet = FakeTask.createTaskSet(1)
142147

143148
val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
144149
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
@@ -162,10 +167,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
162167
test("Fair Scheduler Test") {
163168
sc = new SparkContext("local", "TaskSchedulerImplSuite")
164169
val taskScheduler = new TaskSchedulerImpl(sc)
165-
var tasks = ArrayBuffer[Task[_]]()
166-
val task = new FakeTask(0)
167-
tasks += task
168-
val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
170+
val taskSet = FakeTask.createTaskSet(1)
169171

170172
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
171173
System.setProperty("spark.scheduler.allocation.file", xmlPath)
@@ -219,10 +221,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
219221
test("Nested Pool Test") {
220222
sc = new SparkContext("local", "TaskSchedulerImplSuite")
221223
val taskScheduler = new TaskSchedulerImpl(sc)
222-
var tasks = ArrayBuffer[Task[_]]()
223-
val task = new FakeTask(0)
224-
tasks += task
225-
val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
224+
val taskSet = FakeTask.createTaskSet(1)
226225

227226
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
228227
val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
@@ -265,4 +264,35 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
265264
checkTaskSetId(rootPool, 6)
266265
checkTaskSetId(rootPool, 2)
267266
}
267+
268+
test("Scheduler does not always schedule tasks on the same workers") {
269+
sc = new SparkContext("local", "TaskSchedulerImplSuite")
270+
val taskScheduler = new TaskSchedulerImpl(sc)
271+
taskScheduler.initialize(new FakeSchedulerBackend)
272+
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
273+
var dagScheduler = new DAGScheduler(taskScheduler) {
274+
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
275+
override def executorGained(execId: String, host: String) {}
276+
}
277+
278+
val numFreeCores = 1
279+
val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores),
280+
new WorkerOffer("executor1", "host1", numFreeCores))
281+
// Repeatedly try to schedule a 1-task job, and make sure that it doesn't always
282+
// get scheduled on the same executor. While there is a chance this test will fail
283+
// because the task randomly gets placed on the first executor all 1000 times, the
284+
// probability of that happening is 2^-1000 (so sufficiently small to be considered
285+
// negligible).
286+
val numTrials = 1000
287+
val selectedExecutorIds = 1.to(numTrials).map { _ =>
288+
val taskSet = FakeTask.createTaskSet(1)
289+
taskScheduler.submitTasks(taskSet)
290+
val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
291+
assert(1 === taskDescriptions.length)
292+
taskDescriptions(0).executorId
293+
}
294+
var count = selectedExecutorIds.count(_ == workerOffers(0).executorId)
295+
assert(count > 0)
296+
assert(count < numTrials)
297+
}
268298
}

0 commit comments

Comments
 (0)