Skip to content

Commit 2f48789

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into yarn_ClientBase
2 parents 8d7b82f + 716c88a commit 2f48789

File tree

9 files changed

+75
-18
lines changed

9 files changed

+75
-18
lines changed

core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ package org.apache.spark.deploy
1919

2020
private[spark] object ExecutorState extends Enumeration {
2121

22-
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
22+
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
2323

2424
type ExecutorState = Value
2525

26-
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST).contains(state)
26+
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
2727
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,10 +303,11 @@ private[spark] class Master(
303303
appInfo.removeExecutor(exec)
304304
exec.worker.removeExecutor(exec)
305305

306+
val normalExit = exitStatus.exists(_ == 0)
306307
// Only retry certain number of times so we don't go into an infinite loop.
307-
if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
308+
if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
308309
schedule()
309-
} else {
310+
} else if (!normalExit) {
310311
logError("Application %s with ID %s failed %d times, removing it".format(
311312
appInfo.desc.name, appInfo.id, appInfo.retryCount))
312313
removeApplication(appInfo, ApplicationState.FAILED)

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,10 @@ private[spark] class ExecutorRunner(
154154
Files.write(header, stderr, Charsets.UTF_8)
155155
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
156156

157-
// Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
158-
// long-lived processes only. However, in the future, we might restart the executor a few
159-
// times on the same machine.
157+
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
158+
// or with nonzero exit code
160159
val exitCode = process.waitFor()
161-
state = ExecutorState.FAILED
160+
state = ExecutorState.EXITED
162161
val message = "Command exited with code " + exitCode
163162
worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
164163
} catch {

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -787,8 +787,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
787787
val outfmt = job.getOutputFormatClass
788788
val jobFormat = outfmt.newInstance
789789

790-
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
791-
jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
790+
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
792791
// FileOutputFormat ignores the filesystem parameter
793792
jobFormat.checkOutputSpecs(job)
794793
}
@@ -854,8 +853,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
854853
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
855854
valueClass.getSimpleName + ")")
856855

857-
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
858-
outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
856+
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
859857
// FileOutputFormat ignores the filesystem parameter
860858
val ignoredFs = FileSystem.get(conf)
861859
conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)

core/src/main/scala/org/apache/spark/storage/RDDInfo.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class RDDInfo(
2626
val id: Int,
2727
val name: String,
2828
val numPartitions: Int,
29-
val storageLevel: StorageLevel)
29+
var storageLevel: StorageLevel)
3030
extends Ordered[RDDInfo] {
3131

3232
var numCachedPartitions = 0
@@ -36,8 +36,8 @@ class RDDInfo(
3636

3737
override def toString = {
3838
import Utils.bytesToString
39-
("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; " +
40-
"TachyonSize: %s; DiskSize: %s").format(
39+
("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " +
40+
"MemorySize: %s; TachyonSize: %s; DiskSize: %s").format(
4141
name, id, storageLevel.toString, numCachedPartitions, numPartitions,
4242
bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize))
4343
}

core/src/main/scala/org/apache/spark/storage/StorageUtils.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,13 @@ private[spark] object StorageUtils {
8989
// Add up memory, disk and Tachyon sizes
9090
val persistedBlocks =
9191
blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 }
92+
val _storageLevel =
93+
if (persistedBlocks.length > 0) persistedBlocks(0).storageLevel else StorageLevel.NONE
9294
val memSize = persistedBlocks.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
9395
val diskSize = persistedBlocks.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
9496
val tachyonSize = persistedBlocks.map(_.tachyonSize).reduceOption(_ + _).getOrElse(0L)
9597
rddInfoMap.get(rddId).map { rddInfo =>
98+
rddInfo.storageLevel = _storageLevel
9699
rddInfo.numCachedPartitions = persistedBlocks.length
97100
rddInfo.memSize = memSize
98101
rddInfo.diskSize = diskSize

docs/programming-guide.md

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -762,6 +762,11 @@ val counts = pairs.reduceByKey((a, b) => a + b)
762762
We could also use `counts.sortByKey()`, for example, to sort the pairs alphabetically, and finally
763763
`counts.collect()` to bring them back to the driver program as an array of objects.
764764

765+
**Note:** when using custom objects as the key in key-value pair operations, you must be sure that a
766+
custom `equals()` method is accompanied with a matching `hashCode()` method. For full details, see
767+
the contract outlined in the [Object.hashCode()
768+
documentation](http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html#hashCode()).
769+
765770
</div>
766771

767772
<div data-lang="java" markdown="1">
@@ -794,6 +799,10 @@ JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
794799
We could also use `counts.sortByKey()`, for example, to sort the pairs alphabetically, and finally
795800
`counts.collect()` to bring them back to the driver program as an array of objects.
796801

802+
**Note:** when using custom objects as the key in key-value pair operations, you must be sure that a
803+
custom `equals()` method is accompanied with a matching `hashCode()` method. For full details, see
804+
the contract outlined in the [Object.hashCode()
805+
documentation](http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html#hashCode()).
797806

798807
</div>
799808

@@ -890,7 +899,7 @@ for details.
890899
</tr>
891900
<tr>
892901
<td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) </td>
893-
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
902+
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function <i>func</i>, which must be of type (V,V) => V. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
894903
</tr>
895904
<tr>
896905
<td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numTasks</i>]) </td>
@@ -1058,7 +1067,10 @@ storage levels is:
10581067
<td> Store RDD in serialized format in <a href="http://tachyon-project.org">Tachyon</a>.
10591068
Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors
10601069
to be smaller and to share a pool of memory, making it attractive in environments with
1061-
large heaps or multiple concurrent applications.
1070+
large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon,
1071+
the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory
1072+
in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts
1073+
from memory.
10621074
</td>
10631075
</tr>
10641076
</table>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,14 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] {
187187
} else {
188188
arg
189189
}
190+
case Some(arg: TreeNode[_]) if children contains arg =>
191+
val newChild = arg.asInstanceOf[BaseType].transformDown(rule)
192+
if (!(newChild fastEquals arg)) {
193+
changed = true
194+
Some(newChild)
195+
} else {
196+
Some(arg)
197+
}
190198
case m: Map[_,_] => m
191199
case args: Traversable[_] => args.map {
192200
case arg: TreeNode[_] if children contains arg =>
@@ -231,6 +239,14 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] {
231239
} else {
232240
arg
233241
}
242+
case Some(arg: TreeNode[_]) if children contains arg =>
243+
val newChild = arg.asInstanceOf[BaseType].transformUp(rule)
244+
if (!(newChild fastEquals arg)) {
245+
changed = true
246+
Some(newChild)
247+
} else {
248+
Some(arg)
249+
}
234250
case m: Map[_,_] => m
235251
case args: Traversable[_] => args.map {
236252
case arg: TreeNode[_] if children contains arg =>
@@ -273,7 +289,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] {
273289
} catch {
274290
case e: java.lang.IllegalArgumentException =>
275291
throw new TreeNodeException(
276-
this, s"Failed to copy node. Is otherCopyArgs specified correctly for $nodeName?")
292+
this, s"Failed to copy node. Is otherCopyArgs specified correctly for $nodeName? "
293+
+ s"Exception message: ${e.getMessage}.")
277294
}
278295
}
279296

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,17 @@ import scala.collection.mutable.ArrayBuffer
2222
import org.scalatest.FunSuite
2323

2424
import org.apache.spark.sql.catalyst.expressions._
25+
import org.apache.spark.sql.catalyst.types.{StringType, NullType}
26+
27+
case class Dummy(optKey: Option[Expression]) extends Expression {
28+
def children = optKey.toSeq
29+
def references = Set.empty[Attribute]
30+
def nullable = true
31+
def dataType = NullType
32+
override lazy val resolved = true
33+
type EvaluatedType = Any
34+
def eval(input: Row) = null.asInstanceOf[Any]
35+
}
2536

2637
class TreeNodeSuite extends FunSuite {
2738
test("top node changed") {
@@ -75,4 +86,20 @@ class TreeNodeSuite extends FunSuite {
7586

7687
assert(expected === actual)
7788
}
89+
90+
test("transform works on nodes with Option children") {
91+
val dummy1 = Dummy(Some(Literal("1", StringType)))
92+
val dummy2 = Dummy(None)
93+
val toZero: PartialFunction[Expression, Expression] = { case Literal(_, _) => Literal(0) }
94+
95+
var actual = dummy1 transformDown toZero
96+
assert(actual === Dummy(Some(Literal(0))))
97+
98+
actual = dummy1 transformUp toZero
99+
assert(actual === Dummy(Some(Literal(0))))
100+
101+
actual = dummy2 transform toZero
102+
assert(actual === Dummy(None))
103+
}
104+
78105
}

0 commit comments

Comments
 (0)