Skip to content

Commit 8ef8751

Browse files
committed
Merge branch 'master' of git://git.apache.org/spark into SPARK-2964
2 parents 26e7c95 + 4243bb6 commit 8ef8751

File tree

62 files changed

+1931
-368
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+1931
-368
lines changed

bin/spark-class

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,14 @@ else
105105
exit 1
106106
fi
107107
fi
108+
JAVA_VERSION=$($RUNNER -version 2>&1 | sed 's/java version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
108109

109110
# Set JAVA_OPTS to be able to load native libraries and to set heap size
110-
JAVA_OPTS="-XX:MaxPermSize=128m $OUR_JAVA_OPTS"
111+
if [ "$JAVA_VERSION" -ge 18 ]; then
112+
JAVA_OPTS="$OUR_JAVA_OPTS"
113+
else
114+
JAVA_OPTS="-XX:MaxPermSize=128m $OUR_JAVA_OPTS"
115+
fi
111116
JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"
112117

113118
# Load extra JAVA_OPTS from conf/java-opts, if it exists

bin/spark-class2.cmd

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,13 @@ rem All drivers use SPARK_JAVA_OPTS + SPARK_DRIVER_MEMORY. The repl also uses SP
7777
)
7878

7979
rem Set JAVA_OPTS to be able to load native libraries and to set heap size
80-
set JAVA_OPTS=-XX:MaxPermSize=128m %OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
80+
for /f "tokens=3" %%i in ('java -version 2^>^&1 ^| find "version"') do set jversion=%%i
81+
for /f "tokens=1 delims=_" %%i in ("%jversion:~1,-1%") do set jversion=%%i
82+
if "%jversion%" geq "1.8.0" (
83+
set JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
84+
) else (
85+
set JAVA_OPTS=-XX:MaxPermSize=128m %OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
86+
)
8187
rem Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in CommandUtils.scala!
8288

8389
rem Test whether the user has built Spark

bin/spark-shell

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
cygwin=false
2424
case "`uname`" in
25-
CYGWIN*) cygwin=true;;
25+
CYGWIN*) cygwin=true;;
2626
esac
2727

2828
# Enter posix mode for bash
@@ -32,9 +32,9 @@ set -o posix
3232
FWDIR="$(cd `dirname $0`/..; pwd)"
3333

3434
function usage() {
35-
echo "Usage: ./bin/spark-shell [options]"
36-
$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
37-
exit 0
35+
echo "Usage: ./bin/spark-shell [options]"
36+
$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
37+
exit 0
3838
}
3939

4040
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
@@ -46,20 +46,20 @@ SUBMIT_USAGE_FUNCTION=usage
4646
gatherSparkSubmitOpts "$@"
4747

4848
function main() {
49-
if $cygwin; then
50-
# Workaround for issue involving JLine and Cygwin
51-
# (see http://sourceforge.net/p/jline/bugs/40/).
52-
# If you're using the Mintty terminal emulator in Cygwin, may need to set the
53-
# "Backspace sends ^H" setting in "Keys" section of the Mintty options
54-
# (see https://github.com/sbt/sbt/issues/562).
55-
stty -icanon min 1 -echo > /dev/null 2>&1
56-
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
57-
$FWDIR/bin/spark-submit --class org.apache.spark.repl.Main "${SUBMISSION_OPTS[@]}" spark-shell "${APPLICATION_OPTS[@]}"
58-
stty icanon echo > /dev/null 2>&1
59-
else
60-
export SPARK_SUBMIT_OPTS
61-
$FWDIR/bin/spark-submit --class org.apache.spark.repl.Main "${SUBMISSION_OPTS[@]}" spark-shell "${APPLICATION_OPTS[@]}"
62-
fi
49+
if $cygwin; then
50+
# Workaround for issue involving JLine and Cygwin
51+
# (see http://sourceforge.net/p/jline/bugs/40/).
52+
# If you're using the Mintty terminal emulator in Cygwin, may need to set the
53+
# "Backspace sends ^H" setting in "Keys" section of the Mintty options
54+
# (see https://github.com/sbt/sbt/issues/562).
55+
stty -icanon min 1 -echo > /dev/null 2>&1
56+
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
57+
$FWDIR/bin/spark-submit --class org.apache.spark.repl.Main "${SUBMISSION_OPTS[@]}" spark-shell "${APPLICATION_OPTS[@]}"
58+
stty icanon echo > /dev/null 2>&1
59+
else
60+
export SPARK_SUBMIT_OPTS
61+
$FWDIR/bin/spark-submit --class org.apache.spark.repl.Main "${SUBMISSION_OPTS[@]}" spark-shell "${APPLICATION_OPTS[@]}"
62+
fi
6363
}
6464

6565
# Copy restore-TTY-on-exit functions from Scala script so spark-shell exits properly even in

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -749,6 +749,23 @@ private[spark] object PythonRDD extends Logging {
749749
}
750750
}
751751
}
752+
753+
/**
754+
* Convert an RDD of serialized Python objects to RDD of objects, that is usable by PySpark.
755+
*/
756+
def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = {
757+
pyRDD.rdd.mapPartitions { iter =>
758+
val unpickle = new Unpickler
759+
iter.flatMap { row =>
760+
val obj = unpickle.loads(row)
761+
if (batched) {
762+
obj.asInstanceOf[JArrayList[_]]
763+
} else {
764+
Seq(obj)
765+
}
766+
}
767+
}.toJavaRDD()
768+
}
752769
}
753770

754771
private

core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,15 @@ object CommandUtils extends Logging {
6464
Seq()
6565
}
6666

67-
val permGenOpt = Seq("-XX:MaxPermSize=128m")
68-
6967
// Figure out our classpath with the external compute-classpath script
7068
val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
7169
val classPath = Utils.executeAndGetOutput(
7270
Seq(sparkHome + "/bin/compute-classpath" + ext),
7371
extraEnvironment = command.environment)
7472
val userClassPath = command.classPathEntries ++ Seq(classPath)
7573

74+
val javaVersion = System.getProperty("java.version")
75+
val permGenOpt = if (!javaVersion.startsWith("1.8")) Some("-XX:MaxPermSize=128m") else None
7676
Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++
7777
permGenOpt ++ libraryOpts ++ workerLocalOpts ++ command.javaOpts ++ memoryOpts
7878
}

core/src/main/scala/org/apache/spark/network/ConnectionManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -851,8 +851,8 @@ private[spark] class ConnectionManager(
851851
messageStatuses.synchronized {
852852
messageStatuses.remove(message.id).foreach ( s => {
853853
promise.failure(
854-
new IOException(s"sendMessageReliably failed because ack " +
855-
"was not received within ${ackTimeout} sec"))
854+
new IOException("sendMessageReliably failed because ack " +
855+
s"was not received within $ackTimeout sec"))
856856
})
857857
}
858858
}

core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import cern.jet.stat.Probability
2222
import org.apache.spark.util.StatCounter
2323

2424
/**
25-
* An ApproximateEvaluator for sums. It estimates the mean and the cont and multiplies them
25+
* An ApproximateEvaluator for sums. It estimates the mean and the count and multiplies them
2626
* together, then uses the formula for the variance of two independent random variables to get
2727
* a variance for the result and compute a confidence interval.
2828
*/

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@
1717

1818
package org.apache.spark.shuffle.sort
1919

20-
import java.io.{BufferedOutputStream, File, FileOutputStream, DataOutputStream}
20+
import java.io.File
2121

2222
import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext}
2323
import org.apache.spark.executor.ShuffleWriteMetrics
2424
import org.apache.spark.scheduler.MapStatus
25-
import org.apache.spark.serializer.Serializer
2625
import org.apache.spark.shuffle.{ShuffleWriter, BaseShuffleHandle}
2726
import org.apache.spark.storage.ShuffleBlockId
2827
import org.apache.spark.util.collection.ExternalSorter
@@ -37,10 +36,6 @@ private[spark] class SortShuffleWriter[K, V, C](
3736
private val numPartitions = dep.partitioner.numPartitions
3837

3938
private val blockManager = SparkEnv.get.blockManager
40-
private val ser = Serializer.getSerializer(dep.serializer.orNull)
41-
42-
private val conf = SparkEnv.get.conf
43-
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
4439

4540
private var sorter: ExternalSorter[K, V, _] = null
4641
private var outputFile: File = null

docs/mllib-dimensionality-reduction.md

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ displayTitle: <a href="mllib-guide.html">MLlib</a> - Dimensionality Reduction
1111
of reducing the number of variables under consideration.
1212
It can be used to extract latent features from raw and noisy features
1313
or compress data while maintaining the structure.
14-
MLlib provides support for dimensionality reduction on tall-and-skinny matrices.
14+
MLlib provides support for dimensionality reduction on the <a href="mllib-basics.html#rowmatrix">RowMatrix</a> class.
1515

1616
## Singular value decomposition (SVD)
1717

@@ -39,8 +39,26 @@ If we keep the top $k$ singular values, then the dimensions of the resulting low
3939
* `$\Sigma$`: `$k \times k$`,
4040
* `$V$`: `$n \times k$`.
4141

42-
MLlib provides SVD functionality to row-oriented matrices that have only a few columns,
43-
say, less than $1000$, but many rows, i.e., *tall-and-skinny* matrices.
42+
### Performance
43+
We assume $n$ is smaller than $m$. The singular values and the right singular vectors are derived
44+
from the eigenvalues and the eigenvectors of the Gramian matrix $A^T A$. The matrix
45+
storing the left singular vectors $U$, is computed via matrix multiplication as
46+
$U = A (V S^{-1})$, if requested by the user via the computeU parameter.
47+
The actual method to use is determined automatically based on the computational cost:
48+
49+
* If $n$ is small ($n < 100$) or $k$ is large compared with $n$ ($k > n / 2$), we compute the Gramian matrix
50+
first and then compute its top eigenvalues and eigenvectors locally on the driver.
51+
This requires a single pass with $O(n^2)$ storage on each executor and on the driver, and
52+
$O(n^2 k)$ time on the driver.
53+
* Otherwise, we compute $(A^T A) v$ in a distributive way and send it to
54+
<a href="http://www.caam.rice.edu/software/ARPACK/">ARPACK</a> to
55+
compute $(A^T A)$'s top eigenvalues and eigenvectors on the driver node. This requires $O(k)$
56+
passes, $O(n)$ storage on each executor, and $O(n k)$ storage on the driver.
57+
58+
### SVD Example
59+
60+
MLlib provides SVD functionality to row-oriented matrices, provided in the
61+
<a href="mllib-basics.html#rowmatrix">RowMatrix</a> class.
4462

4563
<div class="codetabs">
4664
<div data-lang="scala" markdown="1">
@@ -124,9 +142,8 @@ MLlib supports PCA for tall-and-skinny matrices stored in row-oriented format.
124142
<div class="codetabs">
125143
<div data-lang="scala" markdown="1">
126144

127-
The following code demonstrates how to compute principal components on a tall-and-skinny `RowMatrix`
145+
The following code demonstrates how to compute principal components on a `RowMatrix`
128146
and use them to project the vectors into a low-dimensional space.
129-
The number of columns should be small, e.g, less than 1000.
130147

131148
{% highlight scala %}
132149
import org.apache.spark.mllib.linalg.Matrix
@@ -144,7 +161,7 @@ val projected: RowMatrix = mat.multiply(pc)
144161

145162
<div data-lang="java" markdown="1">
146163

147-
The following code demonstrates how to compute principal components on a tall-and-skinny `RowMatrix`
164+
The following code demonstrates how to compute principal components on a `RowMatrix`
148165
and use them to project the vectors into a low-dimensional space.
149166
The number of columns should be small, e.g, less than 1000.
150167

docs/mllib-feature-extraction.md

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
---
22
layout: global
3-
title: Feature Extraction - MLlib
4-
displayTitle: <a href="mllib-guide.html">MLlib</a> - Feature Extraction
3+
title: Feature Extraction and Transformation - MLlib
4+
displayTitle: <a href="mllib-guide.html">MLlib</a> - Feature Extraction and Transformation
55
---
66

77
* Table of contents
@@ -148,3 +148,108 @@ for((synonym, cosineSimilarity) <- synonyms) {
148148
{% endhighlight %}
149149
</div>
150150
</div>
151+
152+
## StandardScaler
153+
154+
Standardizes features by scaling to unit variance and/or removing the mean using column summary
155+
statistics on the samples in the training set. This is a very common pre-processing step.
156+
157+
For example, RBF kernel of Support Vector Machines or the L1 and L2 regularized linear models
158+
typically work better when all features have unit variance and/or zero mean.
159+
160+
Standardization can improve the convergence rate during the optimization process, and also prevents
161+
against features with very large variances exerting an overly large influence during model training.
162+
163+
### Model Fitting
164+
165+
[`StandardScaler`](api/scala/index.html#org.apache.spark.mllib.feature.StandardScaler) has the
166+
following parameters in the constructor:
167+
168+
* `withMean` False by default. Centers the data with mean before scaling. It will build a dense
169+
output, so this does not work on sparse input and will raise an exception.
170+
* `withStd` True by default. Scales the data to unit variance.
171+
172+
We provide a [`fit`](api/scala/index.html#org.apache.spark.mllib.feature.StandardScaler) method in
173+
`StandardScaler` which can take an input of `RDD[Vector]`, learn the summary statistics, and then
174+
return a model which can transform the input dataset into unit variance and/or zero mean features
175+
depending how we configure the `StandardScaler`.
176+
177+
This model implements [`VectorTransformer`](api/scala/index.html#org.apache.spark.mllib.feature.VectorTransformer)
178+
which can apply the standardization on a `Vector` to produce a transformed `Vector` or on
179+
an `RDD[Vector]` to produce a transformed `RDD[Vector]`.
180+
181+
Note that if the variance of a feature is zero, it will return default `0.0` value in the `Vector`
182+
for that feature.
183+
184+
### Example
185+
186+
The example below demonstrates how to load a dataset in libsvm format, and standardize the features
187+
so that the new features have unit variance and/or zero mean.
188+
189+
<div class="codetabs">
190+
<div data-lang="scala">
191+
{% highlight scala %}
192+
import org.apache.spark.SparkContext._
193+
import org.apache.spark.mllib.feature.StandardScaler
194+
import org.apache.spark.mllib.linalg.Vectors
195+
import org.apache.spark.mllib.util.MLUtils
196+
197+
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
198+
199+
val scaler1 = new StandardScaler().fit(data.map(x => x.features))
200+
val scaler2 = new StandardScaler(withMean = true, withStd = true).fit(data.map(x => x.features))
201+
202+
// data1 will be unit variance.
203+
val data1 = data.map(x => (x.label, scaler1.transform(x.features)))
204+
205+
// Without converting the features into dense vectors, transformation with zero mean will raise
206+
// exception on sparse vector.
207+
// data2 will be unit variance and zero mean.
208+
val data2 = data.map(x => (x.label, scaler2.transform(Vectors.dense(x.features.toArray))))
209+
{% endhighlight %}
210+
</div>
211+
</div>
212+
213+
## Normalizer
214+
215+
Normalizer scales individual samples to have unit $L^p$ norm. This is a common operation for text
216+
classification or clustering. For example, the dot product of two $L^2$ normalized TF-IDF vectors
217+
is the cosine similarity of the vectors.
218+
219+
[`Normalizer`](api/scala/index.html#org.apache.spark.mllib.feature.Normalizer) has the following
220+
parameter in the constructor:
221+
222+
* `p` Normalization in $L^p$ space, $p = 2$ by default.
223+
224+
`Normalizer` implements [`VectorTransformer`](api/scala/index.html#org.apache.spark.mllib.feature.VectorTransformer)
225+
which can apply the normalization on a `Vector` to produce a transformed `Vector` or on
226+
an `RDD[Vector]` to produce a transformed `RDD[Vector]`.
227+
228+
Note that if the norm of the input is zero, it will return the input vector.
229+
230+
### Example
231+
232+
The example below demonstrates how to load a dataset in libsvm format, and normalizes the features
233+
with $L^2$ norm, and $L^\infty$ norm.
234+
235+
<div class="codetabs">
236+
<div data-lang="scala">
237+
{% highlight scala %}
238+
import org.apache.spark.SparkContext._
239+
import org.apache.spark.mllib.feature.Normalizer
240+
import org.apache.spark.mllib.linalg.Vectors
241+
import org.apache.spark.mllib.util.MLUtils
242+
243+
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
244+
245+
val normalizer1 = new Normalizer()
246+
val normalizer2 = new Normalizer(p = Double.PositiveInfinity)
247+
248+
// Each sample in data1 will be normalized using $L^2$ norm.
249+
val data1 = data.map(x => (x.label, normalizer1.transform(x.features)))
250+
251+
// Each sample in data2 will be normalized using $L^\infty$ norm.
252+
val data2 = data.map(x => (x.label, normalizer2.transform(x.features)))
253+
{% endhighlight %}
254+
</div>
255+
</div>

0 commit comments

Comments
 (0)