Skip to content

Commit 6437e9a

Browse files
author
Davies Liu
committed
Merge branch 'master' of github.com:apache/spark into readwrite
2 parents bcc6668 + fcf90b7 commit 6437e9a

File tree

96 files changed

+3572
-922
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

96 files changed

+3572
-922
lines changed

core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css

Lines changed: 44 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,21 @@
1515
* limitations under the License.
1616
*/
1717

18-
#dag-viz-graph svg path {
19-
stroke: #444;
20-
stroke-width: 1.5px;
21-
}
22-
23-
#dag-viz-graph svg g.cluster rect {
24-
stroke-width: 1px;
25-
}
26-
27-
#dag-viz-graph svg g.node circle {
28-
fill: #444;
18+
#dag-viz-graph a, #dag-viz-graph a:hover {
19+
text-decoration: none;
2920
}
3021

31-
#dag-viz-graph svg g.node rect {
32-
fill: #C3EBFF;
33-
stroke: #3EC0FF;
34-
stroke-width: 1px;
22+
#dag-viz-graph .label {
23+
font-weight: normal;
24+
text-shadow: none;
3525
}
3626

37-
#dag-viz-graph svg g.node.cached circle {
38-
fill: #444;
27+
#dag-viz-graph svg path {
28+
stroke: #444;
29+
stroke-width: 1.5px;
3930
}
4031

41-
#dag-viz-graph svg g.node.cached rect {
42-
fill: #B3F5C5;
43-
stroke: #56F578;
32+
#dag-viz-graph svg g.cluster rect {
4433
stroke-width: 1px;
4534
}
4635

@@ -61,12 +50,23 @@
6150
stroke-width: 1px;
6251
}
6352

64-
#dag-viz-graph svg.job g.cluster[class*="stage"] rect {
53+
#dag-viz-graph svg.job g.cluster.skipped rect {
54+
fill: #D6D6D6;
55+
stroke: #B7B7B7;
56+
stroke-width: 1px;
57+
}
58+
59+
#dag-viz-graph svg.job g.cluster.stage rect {
6560
fill: #FFFFFF;
6661
stroke: #FF99AC;
6762
stroke-width: 1px;
6863
}
6964

65+
#dag-viz-graph svg.job g.cluster.stage.skipped rect {
66+
stroke: #ADADAD;
67+
stroke-width: 1px;
68+
}
69+
7070
#dag-viz-graph svg.job g#cross-stage-edges path {
7171
fill: none;
7272
}
@@ -75,6 +75,20 @@
7575
fill: #333;
7676
}
7777

78+
#dag-viz-graph svg.job g.cluster.skipped text {
79+
fill: #666;
80+
}
81+
82+
#dag-viz-graph svg.job g.node circle {
83+
fill: #444;
84+
}
85+
86+
#dag-viz-graph svg.job g.node.cached circle {
87+
fill: #A3F545;
88+
stroke: #52C366;
89+
stroke-width: 2px;
90+
}
91+
7892
/* Stage page specific styles */
7993

8094
#dag-viz-graph svg.stage g.cluster rect {
@@ -83,7 +97,7 @@
8397
stroke-width: 1px;
8498
}
8599

86-
#dag-viz-graph svg.stage g.cluster[class*="stage"] rect {
100+
#dag-viz-graph svg.stage g.cluster.stage rect {
87101
fill: #FFFFFF;
88102
stroke: #FFA6B6;
89103
stroke-width: 1px;
@@ -97,11 +111,14 @@
97111
fill: #333;
98112
}
99113

100-
#dag-viz-graph a, #dag-viz-graph a:hover {
101-
text-decoration: none;
114+
#dag-viz-graph svg.stage g.node rect {
115+
fill: #C3EBFF;
116+
stroke: #3EC0FF;
117+
stroke-width: 1px;
102118
}
103119

104-
#dag-viz-graph .label {
105-
font-weight: normal;
106-
text-shadow: none;
120+
#dag-viz-graph svg.stage g.node.cached rect {
121+
fill: #B3F5C5;
122+
stroke: #52C366;
123+
stroke-width: 2px;
107124
}

core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,7 @@ var VizConstants = {
5757
stageSep: 40,
5858
graphPrefix: "graph_",
5959
nodePrefix: "node_",
60-
stagePrefix: "stage_",
61-
clusterPrefix: "cluster_",
62-
stageClusterPrefix: "cluster_stage_"
60+
clusterPrefix: "cluster_"
6361
};
6462

6563
var JobPageVizConstants = {
@@ -133,9 +131,7 @@ function renderDagViz(forJob) {
133131
}
134132

135133
// Render
136-
var svg = graphContainer()
137-
.append("svg")
138-
.attr("class", jobOrStage);
134+
var svg = graphContainer().append("svg").attr("class", jobOrStage);
139135
if (forJob) {
140136
renderDagVizForJob(svg);
141137
} else {
@@ -185,23 +181,32 @@ function renderDagVizForJob(svgContainer) {
185181
var dot = metadata.select(".dot-file").text();
186182
var stageId = metadata.attr("stage-id");
187183
var containerId = VizConstants.graphPrefix + stageId;
188-
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
189-
var stageLink = $("#stage-" + stageId.replace(VizConstants.stagePrefix, "") + "-0")
190-
.find("a")
191-
.attr("href") + "&expandDagViz=true";
192-
var container = svgContainer
193-
.append("a")
194-
.attr("xlink:href", stageLink)
195-
.append("g")
196-
.attr("id", containerId);
184+
var isSkipped = metadata.attr("skipped") == "true";
185+
var container;
186+
if (isSkipped) {
187+
container = svgContainer
188+
.append("g")
189+
.attr("id", containerId)
190+
.attr("skipped", "true");
191+
} else {
192+
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
193+
// Use the link from the stage table so it also works for the history server
194+
var attemptId = 0
195+
var stageLink = d3.select("#stage-" + stageId + "-" + attemptId)
196+
.select("a")
197+
.attr("href") + "&expandDagViz=true";
198+
container = svgContainer
199+
.append("a")
200+
.attr("xlink:href", stageLink)
201+
.append("g")
202+
.attr("id", containerId);
203+
}
197204

198205
// Now we need to shift the container for this stage so it doesn't overlap with
199206
// existing ones, taking into account the position and width of the last stage's
200207
// container. We do not need to do this for the first stage of this job.
201208
if (i > 0) {
202-
var existingStages = svgContainer
203-
.selectAll("g.cluster")
204-
.filter("[class*=\"" + VizConstants.stageClusterPrefix + "\"]");
209+
var existingStages = svgContainer.selectAll("g.cluster.stage")
205210
if (!existingStages.empty()) {
206211
var lastStage = d3.select(existingStages[0].pop());
207212
var lastStageWidth = toFloat(lastStage.select("rect").attr("width"));
@@ -214,6 +219,12 @@ function renderDagVizForJob(svgContainer) {
214219
// Actually render the stage
215220
renderDot(dot, container, true);
216221

222+
// Mark elements as skipped if appropriate. Unfortunately we need to mark all
223+
// elements instead of the parent container because of CSS override rules.
224+
if (isSkipped) {
225+
container.selectAll("g").classed("skipped", true);
226+
}
227+
217228
// Round corners on rectangles
218229
container
219230
.selectAll("rect")
@@ -243,6 +254,9 @@ function renderDot(dot, container, forJob) {
243254
var renderer = new dagreD3.render();
244255
preprocessGraphLayout(g, forJob);
245256
renderer(container, g);
257+
258+
// Find the stage cluster and mark it for styling and post-processing
259+
container.selectAll("g.cluster[name*=\"Stage\"]").classed("stage", true);
246260
}
247261

248262
/* -------------------- *

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ private[spark] class PythonRDD(
4747
pythonIncludes: JList[String],
4848
preservePartitoning: Boolean,
4949
pythonExec: String,
50+
pythonVer: String,
5051
broadcastVars: JList[Broadcast[PythonBroadcast]],
5152
accumulator: Accumulator[JList[Array[Byte]]])
5253
extends RDD[Array[Byte]](parent) {
@@ -210,6 +211,8 @@ private[spark] class PythonRDD(
210211
val dataOut = new DataOutputStream(stream)
211212
// Partition index
212213
dataOut.writeInt(split.index)
214+
// Python version of driver
215+
PythonRDD.writeUTF(pythonVer, dataOut)
213216
// sparkFilesDir
214217
PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut)
215218
// Python includes (*.zip and *.egg files)

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.deploy.worker.WorkerWatcher
3333
import org.apache.spark.scheduler.TaskDescription
3434
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
3535
import org.apache.spark.serializer.SerializerInstance
36-
import org.apache.spark.util.{SignalLogger, Utils}
36+
import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}
3737

3838
private[spark] class CoarseGrainedExecutorBackend(
3939
override val rpcEnv: RpcEnv,
@@ -55,18 +55,19 @@ private[spark] class CoarseGrainedExecutorBackend(
5555
private[this] val ser: SerializerInstance = env.closureSerializer.newInstance()
5656

5757
override def onStart() {
58-
import scala.concurrent.ExecutionContext.Implicits.global
5958
logInfo("Connecting to driver: " + driverUrl)
6059
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
60+
// This is a very fast action so we can use "ThreadUtils.sameThread"
6161
driver = Some(ref)
6262
ref.ask[RegisteredExecutor.type](
6363
RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
64-
} onComplete {
64+
}(ThreadUtils.sameThread).onComplete {
65+
// This is a very fast action so we can use "ThreadUtils.sameThread"
6566
case Success(msg) => Utils.tryLogNonFatalError {
6667
Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor
6768
}
6869
case Failure(e) => logError(s"Cannot register with driver: $driverUrl", e)
69-
}
70+
}(ThreadUtils.sameThread)
7071
}
7172

7273
def extractLogUrls: Map[String, String] = {

core/src/main/scala/org/apache/spark/io/CompressionCodec.scala

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.io
1919

20-
import java.io.{InputStream, OutputStream}
20+
import java.io.{IOException, InputStream, OutputStream}
2121

2222
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
2323
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
@@ -154,8 +154,53 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
154154

155155
override def compressedOutputStream(s: OutputStream): OutputStream = {
156156
val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
157-
new SnappyOutputStream(s, blockSize)
157+
new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize))
158158
}
159159

160160
override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
161161
}
162+
163+
/**
164+
* Wrapper over [[SnappyOutputStream]] which guards against write-after-close and double-close
165+
* issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version
166+
* of snappy-java that contains the fix for https://github.com/xerial/snappy-java/issues/107.
167+
*/
168+
private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends OutputStream {
169+
170+
private[this] var closed: Boolean = false
171+
172+
override def write(b: Int): Unit = {
173+
if (closed) {
174+
throw new IOException("Stream is closed")
175+
}
176+
os.write(b)
177+
}
178+
179+
override def write(b: Array[Byte]): Unit = {
180+
if (closed) {
181+
throw new IOException("Stream is closed")
182+
}
183+
os.write(b)
184+
}
185+
186+
override def write(b: Array[Byte], off: Int, len: Int): Unit = {
187+
if (closed) {
188+
throw new IOException("Stream is closed")
189+
}
190+
os.write(b, off, len)
191+
}
192+
193+
override def flush(): Unit = {
194+
if (closed) {
195+
throw new IOException("Stream is closed")
196+
}
197+
os.flush()
198+
}
199+
200+
override def close(): Unit = {
201+
if (!closed) {
202+
closed = true
203+
os.close()
204+
}
205+
}
206+
}

core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ package org.apache.spark.rdd
1919

2020
import java.util.concurrent.atomic.AtomicLong
2121

22+
import org.apache.spark.util.ThreadUtils
23+
2224
import scala.collection.mutable.ArrayBuffer
23-
import scala.concurrent.ExecutionContext.Implicits.global
25+
import scala.concurrent.ExecutionContext
2426
import scala.reflect.ClassTag
2527

2628
import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
@@ -66,6 +68,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
6668
val f = new ComplexFutureAction[Seq[T]]
6769

6870
f.run {
71+
// This is a blocking action so we should use "AsyncRDDActions.futureExecutionContext" which
72+
// is a cached thread pool.
6973
val results = new ArrayBuffer[T](num)
7074
val totalParts = self.partitions.length
7175
var partsScanned = 0
@@ -101,7 +105,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
101105
partsScanned += numPartsToTry
102106
}
103107
results.toSeq
104-
}
108+
}(AsyncRDDActions.futureExecutionContext)
105109

106110
f
107111
}
@@ -123,3 +127,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
123127
(index, data) => Unit, Unit)
124128
}
125129
}
130+
131+
private object AsyncRDDActions {
132+
val futureExecutionContext = ExecutionContext.fromExecutorService(
133+
ThreadUtils.newDaemonCachedThreadPool("AsyncRDDActions-future", 128))
134+
}

0 commit comments

Comments
 (0)