Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
122 commits
Select commit Hold shift + click to select a range
ba4f8ca
[MINOR] [SQL] Removes an unreachable case clause
liancheng May 17, 2015
1a7b9ce
[MINOR] Add 1.3, 1.3.1 to master branch EC2 scripts
shivaram May 17, 2015
edf09ea
[SQL] [MINOR] Skip unresolved expression for InConversion
scwf May 17, 2015
3399055
[SPARK-7447] [SQL] Don't re-merge Parquet schema when the relation is…
viirya May 17, 2015
5021766
[SPARK-7669] Builds against Hadoop 2.6+ get inconsistent curator depend…
steveloughran May 17, 2015
f2cc6b5
[SPARK-7660] Wrap SnappyOutputStream to work around snappy-java bug
JoshRosen May 17, 2015
5645628
[SPARK-7686] [SQL] DescribeCommand is assigned wrong output attribute…
JoshRosen May 17, 2015
2ca60ac
[SPARK-7491] [SQL] Allow configuration of classloader isolation for hive
marmbrus May 17, 2015
ca4257a
[SPARK-6514] [SPARK-5960] [SPARK-6656] [SPARK-7679] [STREAMING] [KINE…
tdas May 17, 2015
2f22424
[SQL] [MINOR] use catalyst type converter in ScalaUdf
cloud-fan May 17, 2015
ff71d34
[SPARK-7693][Core] Remove "import scala.concurrent.ExecutionContext.I…
zsxwing May 18, 2015
775e6f9
[SPARK-7694] [MLLIB] Use getOrElse for getting the threshold of LR model
coderxiang May 18, 2015
e32c0f6
[SPARK-7299][SQL] Set precision and scale for Decimal according to JD…
viirya May 18, 2015
1ecfac6
[SPARK-6657] [PYSPARK] Fix doc warnings
mengxr May 18, 2015
814b3da
[SPARK-7272] [MLLIB] User guide for PMML model export
selvinsource May 18, 2015
563bfcc
[SPARK-7627] [SPARK-7472] DAG visualization: style skipped stages
May 18, 2015
e1ac2a9
[SPARK-6888] [SQL] Make the jdbc driver handling user-definable
rtreffer May 18, 2015
010a1c2
[SPARK-7570] [SQL] Ignores _temporary during partition discovery
liancheng May 18, 2015
56ede88
[SQL] [MINOR] [THIS] use private for internal field in ScalaUdf
cloud-fan May 18, 2015
9c7e802
[SPARK-7380] [MLLIB] pipeline stages should be copyable in Python
mengxr May 18, 2015
aa31e43
[SPARK-2883] [SQL] ORC data source for Spark SQL
zhzhan May 18, 2015
fc2480e
[SPARK-7631] [SQL] treenode argString should not print children
scwf May 18, 2015
103c863
[SPARK-7269] [SQL] Incorrect analysis for aggregation(use semanticEqu…
cloud-fan May 18, 2015
530397b
[SPARK-7567] [SQL] [follow-up] Use a new flag to set output committer…
yhuai May 18, 2015
9dadf01
[SPARK-7673] [SQL] WIP: HadoopFsRelation and ParquetRelation2 perform…
liancheng May 18, 2015
32fbd29
[SPARK-6216] [PYSPARK] check python version of worker with driver
May 18, 2015
0b6f503
[SPARK-7658] [STREAMING] [WEBUI] Update the mouse behaviors for the t…
zsxwing May 18, 2015
fcf90b7
[HOTFIX] Fix ORC build break
marmbrus May 18, 2015
b93c97d
[SPARK-7501] [STREAMING] DAG visualization: show DStream operations
May 18, 2015
6525fc0
[SPARK-7063] when lz4 compression is used, it causes core dump
JihongMA May 18, 2015
eb4632f
[SQL] Fix serializability of ORC table scan
marmbrus May 18, 2015
4fb52f9
[SPARK-7624] Revert #4147
May 18, 2015
0a7a94e
[SPARK-7621] [STREAMING] Report Kafka errors to StreamingListeners
May 19, 2015
3a60038
[SPARK-7692] Updated Kinesis examples
tdas May 19, 2015
d03638c
[SPARK-7681] [MLLIB] Add SparseVector support for gemv
viirya May 19, 2015
c2437de
[SPARK-7150] SparkContext.range() and SQLContext.range()
adrian-wang May 19, 2015
c9fa870
[SPARK-7687] [SQL] DataFrame.describe() should cast all aggregates to…
JoshRosen May 19, 2015
9ebb44f
[HOTFIX]: Java 6 Build Breaks
pwendell May 19, 2015
23cf897
[HOTFIX] Fixing style failures in Kinesis source
pwendell May 19, 2015
6008ec1
[SPARK-7581] [ML] [DOC] User guide for spark.ml PolynomialExpansion
yinxusen May 19, 2015
61f164d
Fixing a few basic typos in the Programming Guide.
dusenberrymw May 19, 2015
27fa88b
[HOTFIX] Revert "[SPARK-7092] Update spark scala version to 2.11.6"
pwendell May 19, 2015
df34793
[SPARK-7723] Fix string interpolation in pipeline examples
tuxdna May 19, 2015
6845cb2
[SPARK-7681] [MLLIB] remove mima excludes for 1.3
mengxr May 19, 2015
32fa611
[SPARK-7704] Updating Programming Guides per SPARK-4397
daisukebe May 19, 2015
fb90273
[SPARK-7047] [ML] ml.Model optional parent support
jkbradley May 19, 2015
7b16e9f
[SPARK-7678] [ML] Fix default random seed in HasSeed
jkbradley May 19, 2015
3c4c1f9
[SPARK-7726] Fix Scaladoc false errors
dragos May 19, 2015
68fb2a4
[SPARK-7586] [ML] [DOC] Add docs of Word2Vec in ml package
yinxusen May 19, 2015
c12dff9
[SPARK-7652] [MLLIB] Update the implementation of naive Bayes predict…
viirya May 19, 2015
4de74d2
[SPARK-7738] [SQL] [PySpark] add reader and writer API in Python
May 19, 2015
bcb1ff8
[SPARK-7662] [SQL] Resolve correct names for generator in projection
chenghao-intel May 19, 2015
2bc5e06
[SPARK-6246] [EC2] fixed support for more than 100 nodes
oleksii-sliusarenko May 19, 2015
3860520
[SPARK-7744] [DOCS] [MLLIB] Distributed matrix" section in MLlib "Dat…
dusenberrymw May 20, 2015
60336e3
[SPARK-7656] [SQL] use CatalystConf in FunctionRegistry
scwf May 20, 2015
b3abf0b
[SPARK-7663] [MLLIB] Add requirement for word2vec model
yinxusen May 20, 2015
09265ad
[SPARK-7320] [SQL] Add Cube / Rollup for dataframe
chenghao-intel May 20, 2015
3ddf051
[SPARK-7533] [YARN] Decrease spacing between AM-RM heartbeats.
zzvara May 20, 2015
589b12f
[SPARK-7654] [MLLIB] Migrate MLlib to the DataFrame reader/writer API
mengxr May 20, 2015
98a46f9
[SPARK-6094] [MLLIB] Add MultilabelMetrics in PySpark/MLlib
yanboliang May 20, 2015
b631bf7
[SPARK-7713] [SQL] Use shared broadcast hadoop conf for partitioned t…
yhuai May 20, 2015
2ad4837
[SPARK-7537] [MLLIB] spark.mllib API updates
mengxr May 20, 2015
829f1d9
[SPARK-7579] [ML] [DOC] User guide update for OneHotEncoder
sryza May 20, 2015
6338c40
Revert "[SPARK-7320] [SQL] Add Cube / Rollup for dataframe"
pwendell May 20, 2015
191ee47
[SPARK-7511] [MLLIB] pyspark ml seed param should be random by defaul…
holdenk May 20, 2015
9b84443
[SPARK-7237] [SPARK-7741] [CORE] [STREAMING] Clean more closures that…
May 20, 2015
3c434cb
[SPARK-7767] [STREAMING] Added test for checkpoint serialization in S…
tdas May 20, 2015
7956dd7
[SPARK-7698] Cache and reuse buffers in ExecutorMemoryAllocator when …
JoshRosen May 20, 2015
f2faa7a
[SPARK-7251] Perform sequential scan when iterating over BytesToBytesMap
JoshRosen May 20, 2015
c330e52
[SPARK-7762] [MLLIB] set default value for outputCol
mengxr May 21, 2015
5196eff
[SPARK-7719] Re-add UnsafeShuffleWriterSuite test that was removed fo…
JoshRosen May 21, 2015
a70bf06
[SPARK-7750] [WEBUI] Rename endpoints from `json` to `api` to allow fu…
harishreedharan May 21, 2015
895baf8
[SPARK-7777] [STREAMING] Fix the flaky test in org.apache.spark.strea…
zsxwing May 21, 2015
42c592a
[SPARK-7320] [SQL] Add Cube / Rollup for dataframe
chenghao-intel May 21, 2015
ddec173
[SPARK-7774] [MLLIB] add sqlContext to MLlibTestSparkContext
mengxr May 21, 2015
d0eb9ff
[SPARK-7746][SQL] Add FetchSize parameter for JDBC driver
viirya May 21, 2015
04940c4
[SPARK-7389] [CORE] Tachyon integration improvement
May 21, 2015
8ddcb25
[SPARK-7606] [SQL] [PySpark] add version to Python SQL API docs
May 21, 2015
947ea1c
[SPARK-7753] [MLLIB] Update KernelDensity API
mengxr May 21, 2015
1ee8eb4
[SPARK-7745] Change asserts to requires for user input checks in Spar…
brkyvz May 21, 2015
feb3a9d
[SPARK-7320] [SQL] [Minor] Move the testData into beforeAll()
chenghao-intel May 21, 2015
a25c1ab
[SPARK-7565] [SQL] fix MapType in JsonRDD
May 21, 2015
13348e2
[SPARK-7752] [MLLIB] Use lowercase letters for NaiveBayes.modelType
mengxr May 21, 2015
8730fbb
[SPARK-7749] [SQL] Fixes partition discovery for non-partitioned tables
liancheng May 21, 2015
4b7ff30
[SPARK-7787] [STREAMING] Fix serialization issue of SerializableAWSCr…
tdas May 21, 2015
6e53402
[SPARK-6416] [DOCS] RDD.fold() requires the operator to be commutative
srowen May 21, 2015
699906e
[SPARK-7394][SQL] Add Pandas style cast (astype)
kaka1992 May 21, 2015
4f57200
[SPARK-7793] [MLLIB] Use getOrElse for getting the threshold of SVM m…
coderxiang May 21, 2015
f6c486a
[SQL] [TEST] udf_java_method failed due to jdk version
scwf May 21, 2015
15680ae
[SPARK-7775] YARN AM negative sleep exception
May 21, 2015
6d75ed7
[SPARK-7585] [ML] [DOC] VectorIndexer user guide section
jkbradley May 21, 2015
cdc7c05
[SPARK-7498] [MLLIB] add varargs back to setDefault
mengxr May 21, 2015
311fab6
[SPARK-7722] [STREAMING] Added Kinesis to style checker
tdas May 21, 2015
30f3f55
[SPARK-7763] [SPARK-7616] [SQL] Persists partition columns into metas…
yhuai May 21, 2015
3d0cccc
[SPARK-7478] [SQL] Added SQLContext.getOrCreate
tdas May 21, 2015
6b18cdc
[SPARK-7711] Add a startTime property to match the corresponding one …
holdenk May 21, 2015
5287eec
[SPARK-7718] [SQL] Speed up partitioning by avoiding closure cleaning
May 21, 2015
5a3c04b
[SPARK-7800] isDefined should not marked too early in putNewKey
viirya May 21, 2015
147b6be
[BUILD] Always run SQL tests in master build.
yhuai May 21, 2015
347b501
[SPARK-7737] [SQL] Use leaf dirs having data files to discover partit…
yhuai May 21, 2015
d68ea24
[SPARK-7776] [STREAMING] Added shutdown hook to StreamingContext
tdas May 22, 2015
17791a5
[SPARK-7783] [SQL] [PySpark] add DataFrame.rollup/cube in Python
May 22, 2015
f5db4b4
[SPARK-7794] [MLLIB] update RegexTokenizer default settings
mengxr May 22, 2015
85b9637
[SPARK-7219] [MLLIB] Output feature attributes in HashingTF
mengxr May 22, 2015
956c4c9
[SPARK-7657] [YARN] Add driver logs links in application UI, in clust…
harishreedharan May 22, 2015
e4136ea
[DOCS] [MLLIB] Fixing broken link in MLlib Linear Methods documentation.
dusenberrymw May 22, 2015
8f11c61
[SPARK-7535] [.0] [MLLIB] Audit the pipeline APIs for 1.4
mengxr May 22, 2015
2728c3d
[SPARK-7578] [ML] [DOC] User guide for spark.ml Normalizer, IDF, Stan…
jkbradley May 22, 2015
f6f2eeb
[SPARK-7322][SQL] Window functions in DataFrame
chenghao-intel May 22, 2015
4e5220c
[MINOR] [SQL] Ignores Thrift server UISeleniumSuite
liancheng May 22, 2015
3b68cb0
[SPARK-6743] [SQL] Fix empty projections of cached data
marmbrus May 22, 2015
f490b3b
[SPARK-7404] [ML] Add RegressionEvaluator to spark.ml
May 22, 2015
c63036c
Revert "[BUILD] Always run SQL tests in master build."
pwendell May 22, 2015
509d55a
[SPARK-7574] [ML] [DOC] User guide for OneVsRest
May 22, 2015
eac0069
[SPARK-7766] KryoSerializerInstance reuse is unsafe when auto-reset i…
JoshRosen May 22, 2015
31d5d46
[SPARK-7758] [SQL] Override more configs to avoid failure when connec…
WangTaoTheTonic May 22, 2015
e4aef91
[SPARK-7724] [SQL] Support Intersect/Except in Catalyst DSL.
smola May 22, 2015
126d723
[SPARK-7270] [SQL] Consider dynamic partition when inserting into hiv…
viirya May 22, 2015
33ed8ef
[SPARK-7654][SQL] Move insertInto into reader/writer interface.
rxin May 17, 2015
0841a54
Removed experimental tag for deprecated methods.
rxin May 17, 2015
cf83837
Move insertInto to write. Also, remove the partition columns from Ins…
yhuai May 22, 2015
c636e35
Remove unnecessary empty lines.
yhuai May 22, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -757,12 +757,12 @@ test_that("parquetFile works with multiple input paths", {
test_that("describe() on a DataFrame", {
df <- jsonFile(sqlCtx, jsonPath)
stats <- describe(df, "age")
expect_true(collect(stats)[1, "summary"] == "count")
expect_true(collect(stats)[2, "age"] == 24.5)
expect_true(collect(stats)[3, "age"] == 5.5)
expect_equal(collect(stats)[1, "summary"], "count")
expect_equal(collect(stats)[2, "age"], "24.5")
expect_equal(collect(stats)[3, "age"], "5.5")
stats <- describe(df)
expect_true(collect(stats)[4, "name"] == "Andy")
expect_true(collect(stats)[5, "age"] == 30.0)
expect_equal(collect(stats)[4, "name"], "Andy")
expect_equal(collect(stats)[5, "age"], "30")
})

unlink(parquetPath)
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,21 @@
* limitations under the License.
*/

#dag-viz-graph svg path {
stroke: #444;
stroke-width: 1.5px;
}

#dag-viz-graph svg g.cluster rect {
stroke-width: 1px;
}

#dag-viz-graph svg g.node circle {
fill: #444;
#dag-viz-graph a, #dag-viz-graph a:hover {
text-decoration: none;
}

#dag-viz-graph svg g.node rect {
fill: #C3EBFF;
stroke: #3EC0FF;
stroke-width: 1px;
#dag-viz-graph .label {
font-weight: normal;
text-shadow: none;
}

#dag-viz-graph svg g.node.cached circle {
fill: #444;
#dag-viz-graph svg path {
stroke: #444;
stroke-width: 1.5px;
}

#dag-viz-graph svg g.node.cached rect {
fill: #B3F5C5;
stroke: #56F578;
#dag-viz-graph svg g.cluster rect {
stroke-width: 1px;
}

Expand All @@ -61,12 +50,23 @@
stroke-width: 1px;
}

#dag-viz-graph svg.job g.cluster[class*="stage"] rect {
#dag-viz-graph svg.job g.cluster.skipped rect {
fill: #D6D6D6;
stroke: #B7B7B7;
stroke-width: 1px;
}

#dag-viz-graph svg.job g.cluster.stage rect {
fill: #FFFFFF;
stroke: #FF99AC;
stroke-width: 1px;
}

#dag-viz-graph svg.job g.cluster.stage.skipped rect {
stroke: #ADADAD;
stroke-width: 1px;
}

#dag-viz-graph svg.job g#cross-stage-edges path {
fill: none;
}
Expand All @@ -75,6 +75,20 @@
fill: #333;
}

#dag-viz-graph svg.job g.cluster.skipped text {
fill: #666;
}

#dag-viz-graph svg.job g.node circle {
fill: #444;
}

#dag-viz-graph svg.job g.node.cached circle {
fill: #A3F545;
stroke: #52C366;
stroke-width: 2px;
}

/* Stage page specific styles */

#dag-viz-graph svg.stage g.cluster rect {
Expand All @@ -83,7 +97,7 @@
stroke-width: 1px;
}

#dag-viz-graph svg.stage g.cluster[class*="stage"] rect {
#dag-viz-graph svg.stage g.cluster.stage rect {
fill: #FFFFFF;
stroke: #FFA6B6;
stroke-width: 1px;
Expand All @@ -97,11 +111,14 @@
fill: #333;
}

#dag-viz-graph a, #dag-viz-graph a:hover {
text-decoration: none;
#dag-viz-graph svg.stage g.node rect {
fill: #C3EBFF;
stroke: #3EC0FF;
stroke-width: 1px;
}

#dag-viz-graph .label {
font-weight: normal;
text-shadow: none;
#dag-viz-graph svg.stage g.node.cached rect {
fill: #B3F5C5;
stroke: #52C366;
stroke-width: 2px;
}
50 changes: 32 additions & 18 deletions core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ var VizConstants = {
stageSep: 40,
graphPrefix: "graph_",
nodePrefix: "node_",
stagePrefix: "stage_",
clusterPrefix: "cluster_",
stageClusterPrefix: "cluster_stage_"
clusterPrefix: "cluster_"
};

var JobPageVizConstants = {
Expand Down Expand Up @@ -133,9 +131,7 @@ function renderDagViz(forJob) {
}

// Render
var svg = graphContainer()
.append("svg")
.attr("class", jobOrStage);
var svg = graphContainer().append("svg").attr("class", jobOrStage);
if (forJob) {
renderDagVizForJob(svg);
} else {
Expand Down Expand Up @@ -185,23 +181,32 @@ function renderDagVizForJob(svgContainer) {
var dot = metadata.select(".dot-file").text();
var stageId = metadata.attr("stage-id");
var containerId = VizConstants.graphPrefix + stageId;
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
var stageLink = $("#stage-" + stageId.replace(VizConstants.stagePrefix, "") + "-0")
.find("a")
.attr("href") + "&expandDagViz=true";
var container = svgContainer
.append("a")
.attr("xlink:href", stageLink)
.append("g")
.attr("id", containerId);
var isSkipped = metadata.attr("skipped") == "true";
var container;
if (isSkipped) {
container = svgContainer
.append("g")
.attr("id", containerId)
.attr("skipped", "true");
} else {
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
// Use the link from the stage table so it also works for the history server
var attemptId = 0
var stageLink = d3.select("#stage-" + stageId + "-" + attemptId)
.select("a")
.attr("href") + "&expandDagViz=true";
container = svgContainer
.append("a")
.attr("xlink:href", stageLink)
.append("g")
.attr("id", containerId);
}

// Now we need to shift the container for this stage so it doesn't overlap with
// existing ones, taking into account the position and width of the last stage's
// container. We do not need to do this for the first stage of this job.
if (i > 0) {
var existingStages = svgContainer
.selectAll("g.cluster")
.filter("[class*=\"" + VizConstants.stageClusterPrefix + "\"]");
var existingStages = svgContainer.selectAll("g.cluster.stage")
if (!existingStages.empty()) {
var lastStage = d3.select(existingStages[0].pop());
var lastStageWidth = toFloat(lastStage.select("rect").attr("width"));
Expand All @@ -214,6 +219,12 @@ function renderDagVizForJob(svgContainer) {
// Actually render the stage
renderDot(dot, container, true);

// Mark elements as skipped if appropriate. Unfortunately we need to mark all
// elements instead of the parent container because of CSS override rules.
if (isSkipped) {
container.selectAll("g").classed("skipped", true);
}

// Round corners on rectangles
container
.selectAll("rect")
Expand Down Expand Up @@ -243,6 +254,9 @@ function renderDot(dot, container, forJob) {
var renderer = new dagreD3.render();
preprocessGraphLayout(g, forJob);
renderer(container, g);

// Find the stage cluster and mark it for styling and post-processing
container.selectAll("g.cluster[name*=\"Stage\"]").classed("stage", true);
}

/* -------------------- *
Expand Down
82 changes: 77 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* Note: Return statements are NOT allowed in the given body.
*/
private def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)
private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)

// Methods for creating RDDs

Expand All @@ -697,6 +697,78 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}

/**
* Creates a new RDD[Long] containing elements from `start` to `end`(exclusive), increased by
* `step` every element.
*
* @note if we need to cache this RDD, we should make sure each partition does not exceed limit.
*
* @param start the start value.
* @param end the end value.
* @param step the incremental step
* @param numSlices the partition number of the new RDD.
* @return
*/
def range(
start: Long,
end: Long,
step: Long = 1,
numSlices: Int = defaultParallelism): RDD[Long] = withScope {
assertNotStopped()
// when step is 0, range will run infinitely
require(step != 0, "step cannot be 0")
val numElements: BigInt = {
val safeStart = BigInt(start)
val safeEnd = BigInt(end)
if ((safeEnd - safeStart) % step == 0 || safeEnd > safeStart ^ step > 0) {
(safeEnd - safeStart) / step
} else {
// the remainder has the same sign with range, could add 1 more
(safeEnd - safeStart) / step + 1
}
}
parallelize(0 until numSlices, numSlices).mapPartitionsWithIndex((i, _) => {
val partitionStart = (i * numElements) / numSlices * step + start
val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
def getSafeMargin(bi: BigInt): Long =
if (bi.isValidLong) {
bi.toLong
} else if (bi > 0) {
Long.MaxValue
} else {
Long.MinValue
}
val safePartitionStart = getSafeMargin(partitionStart)
val safePartitionEnd = getSafeMargin(partitionEnd)

new Iterator[Long] {
private[this] var number: Long = safePartitionStart
private[this] var overflow: Boolean = false

override def hasNext =
if (!overflow) {
if (step > 0) {
number < safePartitionEnd
} else {
number > safePartitionEnd
}
} else false

override def next() = {
val ret = number
number += step
if (number < ret ^ step < 0) {
// we have Long.MaxValue + Long.MaxValue < Long.MaxValue
// and Long.MinValue + Long.MinValue > Long.MinValue, so iff the step causes a step
// back, we are pretty sure that we have an overflow.
overflow = true
}
ret
}
}
})
}

/** Distribute a local Scala collection to form an RDD.
*
* This method is identical to `parallelize`.
Expand Down Expand Up @@ -1087,8 +1159,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {
withScope {
assertNotStopped()
val kc = kcf()
val vc = vcf()
val kc = clean(kcf)()
val vc = clean(vcf)()
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format,
kc.writableClass(km).asInstanceOf[Class[Writable]],
Expand Down Expand Up @@ -1812,7 +1884,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* @param f the closure to clean
* @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability
* @throws <tt>SparkException<tt> if <tt>checkSerializable</tt> is set but <tt>f</tt> is not
* @throws SparkException if <tt>checkSerializable</tt> is set but <tt>f</tt> is not
* serializable
*/
private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
Expand Down Expand Up @@ -1919,7 +1991,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
startTime, sparkUser, applicationAttemptId))
startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls))
}

/** Post the application end event */
Expand Down
13 changes: 10 additions & 3 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {

/**
* Aggregate the elements of each partition, and then the results for all the partitions, using a
* given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
* modify t1 and return it as its result value to avoid object allocation; however, it should not
* modify t2.
* given associative and commutative function and a neutral "zero value". The function
* op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object
* allocation; however, it should not modify t2.
*
* This behaves somewhat differently from fold operations implemented for non-distributed
* collections in functional languages like Scala. This fold operation may be applied to
* partitions individually, and then fold those results into the final result, rather than
* apply the fold to each element sequentially in some defined ordering. For functions
* that are not commutative, the result may differ from that of a fold applied to a
* non-distributed collection.
*/
def fold(zeroValue: T)(f: JFunction2[T, T, T]): T =
rdd.fold(zeroValue)(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ private[spark] class PythonRDD(
pythonIncludes: JList[String],
preservePartitoning: Boolean,
pythonExec: String,
pythonVer: String,
broadcastVars: JList[Broadcast[PythonBroadcast]],
accumulator: Accumulator[JList[Array[Byte]]])
extends RDD[Array[Byte]](parent) {
Expand Down Expand Up @@ -210,6 +211,8 @@ private[spark] class PythonRDD(
val dataOut = new DataOutputStream(stream)
// Partition index
dataOut.writeInt(split.index)
// Python version of driver
PythonRDD.writeUTF(pythonVer, dataOut)
// sparkFilesDir
PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut)
// Python includes (*.zip and *.egg files)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,15 @@ private[spark] object PythonUtils {
/**
* Convert list of T into seq of T (for calling API with varargs)
*/
def toSeq[T](cols: JList[T]): Seq[T] = {
cols.toList.toSeq
def toSeq[T](vs: JList[T]): Seq[T] = {
vs.toList.toSeq
}

/**
* Convert list of T into array of T (for calling API with array)
*/
def toArray[T](vs: JList[T]): Array[T] = {
vs.toArray().asInstanceOf[Array[T]]
}

/**
Expand Down
Loading