Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
156ac27
[SPARK-10858] YARN: archives/jar/files rename with # doesn't work unl
Oct 9, 2015
6dc23e6
[SPARK-10960] [SQL] SQL with windowing function should be able to ref…
viirya Oct 12, 2015
cd55fbc
[SPARK-11023] [YARN] Avoid creating URIs from local paths directly.
Oct 12, 2015
be68a4b
[SPARK-10973] [ML] [PYTHON] Fix IndexError exception on SparseVector …
zero323 Oct 12, 2015
ebbff39
[SPARK-11056] Improve documentation of SBT build.
kayousterhout Oct 12, 2015
2217f4f
Revert "[SPARK-10959] [PYSPARK] StreamingLogisticRegressionWithSGD do…
jkbradley Oct 12, 2015
47bc6c0
[SPARK-11026] [YARN] spark.yarn.user.classpath.first does work for 's…
lianhuiwang Oct 13, 2015
edc5095
[SPARK-11009] [SQL] fix wrong result of Window function in cluster mode
Oct 13, 2015
77eeaad
[SPARK-10932] [PROJECT INFRA] Port two minor changes to release-build…
JoshRosen Oct 13, 2015
15d2736
[SPARK-10959] [PYSPARK] StreamingLogisticRegressionWithSGD does not t…
BryanCutler Oct 13, 2015
94e6d8f
[SPARK-10389] [SQL] [1.5] support order by non-attribute grouping exp…
cloud-fan Oct 13, 2015
3e9d56e
[SPARK-10981] [SPARKR] SparkR Join improvements
mfliu Oct 14, 2015
ad5bf0e
[SPARK-10619] Can't sort columns on Executor Page
Oct 14, 2015
f366249
[SPARK-10845] [SQL] Makes spark.sql.hive.version a SQLConfEntry
liancheng Sep 27, 2015
30eea40
[SPARK-10577] [PYSPARK] DataFrame hint for broadcast join
snowmoon-zhang Sep 22, 2015
c27e190
[SPARK-8386] [SQL] add write.mode for insertIntoJDBC when the parm ov…
huaxingao Oct 14, 2015
0b31268
Merge branch 'branch-1.5' of github.com:apache/spark into csd-1.5
markhamstra Oct 15, 2015
ab6daa5
fixed mismerge
markhamstra Oct 15, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1352,9 +1352,10 @@ setMethod("where",
#' @param x A Spark DataFrame
#' @param y A Spark DataFrame
#' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a
#' Column expression. If joinExpr is omitted, join() wil perform a Cartesian join
#' Column expression. If joinExpr is omitted, join() will perform a Cartesian join
#' @param joinType The type of join to perform. The following join types are available:
#' 'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'. The default joinType is "inner".
#' 'inner', 'outer', 'full', 'fullouter', leftouter', 'left_outer', 'left',
#' 'right_outer', 'rightouter', 'right', and 'leftsemi'. The default joinType is "inner".
#' @return A DataFrame containing the result of the join operation.
#' @rdname join
#' @name join
Expand All @@ -1379,11 +1380,15 @@ setMethod("join",
if (is.null(joinType)) {
sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc)
} else {
if (joinType %in% c("inner", "outer", "left_outer", "right_outer", "semijoin")) {
if (joinType %in% c("inner", "outer", "full", "fullouter",
"leftouter", "left_outer", "left",
"rightouter", "right_outer", "right", "leftsemi")) {
joinType <- gsub("_", "", joinType)
sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc, joinType)
} else {
stop("joinType must be one of the following types: ",
"'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'")
"'inner', 'outer', 'full', 'fullouter', 'leftouter', 'left_outer', 'left',
'rightouter', 'right_outer', 'right', 'leftsemi'")
}
}
}
Expand Down
27 changes: 25 additions & 2 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ test_that("join() and merge() on a DataFrame", {
expect_equal(names(joined2), c("age", "name", "name", "test"))
expect_equal(count(joined2), 3)

joined3 <- join(df, df2, df$name == df2$name, "right_outer")
joined3 <- join(df, df2, df$name == df2$name, "rightouter")
expect_equal(names(joined3), c("age", "name", "name", "test"))
expect_equal(count(joined3), 4)
expect_true(is.na(collect(orderBy(joined3, joined3$age))$age[2]))
Expand All @@ -963,11 +963,34 @@ test_that("join() and merge() on a DataFrame", {
expect_equal(count(joined4), 4)
expect_equal(collect(orderBy(joined4, joined4$name))$newAge[3], 24)

joined5 <- join(df, df2, df$name == df2$name, "leftouter")
expect_equal(names(joined5), c("age", "name", "name", "test"))
expect_equal(count(joined5), 3)
expect_true(is.na(collect(orderBy(joined5, joined5$age))$age[1]))

joined6 <- join(df, df2, df$name == df2$name, "inner")
expect_equal(names(joined6), c("age", "name", "name", "test"))
expect_equal(count(joined6), 3)

joined7 <- join(df, df2, df$name == df2$name, "leftsemi")
expect_equal(names(joined7), c("age", "name"))
expect_equal(count(joined7), 3)

joined8 <- join(df, df2, df$name == df2$name, "left_outer")
expect_equal(names(joined8), c("age", "name", "name", "test"))
expect_equal(count(joined8), 3)
expect_true(is.na(collect(orderBy(joined8, joined8$age))$age[1]))

joined9 <- join(df, df2, df$name == df2$name, "right_outer")
expect_equal(names(joined9), c("age", "name", "name", "test"))
expect_equal(count(joined9), 4)
expect_true(is.na(collect(orderBy(joined9, joined9$age))$age[2]))

merged <- select(merge(df, df2, df$name == df2$name, "outer"),
alias(df$age + 5, "newAge"), df$name, df2$test)
expect_equal(names(merged), c("newAge", "name", "test"))
expect_equal(count(merged), 4)
expect_equal(collect(orderBy(merged, joined4$name))$newAge[3], 24)
expect_equal(collect(orderBy(merged, merged$name))$newAge[3], 24)
})

test_that("toJSON() returns an RDD of the correct values", {
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.ui.scope.RDDOperationGraph
private[spark] object UIUtils extends Logging {
val TABLE_CLASS_NOT_STRIPED = "table table-bordered table-condensed"
val TABLE_CLASS_STRIPED = TABLE_CLASS_NOT_STRIPED + " table-striped"
val TABLE_CLASS_STRIPED_SORTABLE = TABLE_CLASS_STRIPED + " sortable"

// SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[ui] class ExecutorsPage(
val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty

val execTable =
<table class={UIUtils.TABLE_CLASS_STRIPED}>
<table class={UIUtils.TABLE_CLASS_STRIPED_SORTABLE}>
<thead>
<th>Executor ID</th>
<th>Address</th>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
hasBytesSpilled = data.hasBytesSpilled
})

<table class={UIUtils.TABLE_CLASS_STRIPED}>
<table class={UIUtils.TABLE_CLASS_STRIPED_SORTABLE}>
<thead>
<th>Executor ID</th>
<th>Address</th>
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1751,6 +1751,13 @@ private[spark] object Utils extends Logging {
if (uri.getScheme() != null) {
return uri
}
// make sure to handle if the path has a fragment (applies to yarn
// distributed cache)
if (uri.getFragment() != null) {
val absoluteURI = new File(uri.getPath()).getAbsoluteFile().toURI()
return new URI(absoluteURI.getScheme(), absoluteURI.getHost(), absoluteURI.getPath(),
uri.getFragment())
}
} catch {
case e: URISyntaxException =>
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assertResolves("hdfs:/root/spark.jar", "hdfs:/root/spark.jar")
assertResolves("hdfs:///root/spark.jar#app.jar", "hdfs:/root/spark.jar#app.jar")
assertResolves("spark.jar", s"file:$cwd/spark.jar")
assertResolves("spark.jar#app.jar", s"file:$cwd/spark.jar%23app.jar")
assertResolves("spark.jar#app.jar", s"file:$cwd/spark.jar#app.jar")
assertResolves("path to/file.txt", s"file:$cwd/path%20to/file.txt")
if (Utils.isWindows) {
assertResolves("C:\\path\\to\\file.txt", "file:/C:/path/to/file.txt")
Expand Down Expand Up @@ -414,10 +414,10 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assertResolves("file:/jar1,file:/jar2", "file:/jar1,file:/jar2")
assertResolves("hdfs:/jar1,file:/jar2,jar3", s"hdfs:/jar1,file:/jar2,file:$cwd/jar3")
assertResolves("hdfs:/jar1,file:/jar2,jar3,jar4#jar5,path to/jar6",
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4%23jar5,file:$cwd/path%20to/jar6")
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5,file:$cwd/path%20to/jar6")
if (Utils.isWindows) {
assertResolves("""hdfs:/jar1,file:/jar2,jar3,C:\pi.py#py.pi,C:\path to\jar4""",
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py%23py.pi,file:/C:/path%20to/jar4")
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi,file:/C:/path%20to/jar4")
}
}

Expand Down
10 changes: 7 additions & 3 deletions dev/create-release/release-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ GIT_REF=${GIT_REF:-master}
# Destination directory parent on remote server
REMOTE_PARENT_DIR=${REMOTE_PARENT_DIR:-/home/$ASF_USERNAME/public_html}

SSH="ssh -o StrictHostKeyChecking=no -i $ASF_RSA_KEY"
SSH="ssh -o ConnectTimeout=300 -o StrictHostKeyChecking=no -i $ASF_RSA_KEY"
GPG="gpg --no-tty --batch"
NEXUS_ROOT=https://repository.apache.org/service/local/staging
NEXUS_PROFILE=d63f592e7eac0 # Profile for Spark staging uploads
Expand Down Expand Up @@ -141,8 +141,12 @@ if [[ "$1" == "package" ]]; then

export ZINC_PORT=$ZINC_PORT
echo "Creating distribution: $NAME ($FLAGS)"
./make-distribution.sh --name $NAME --tgz $FLAGS -DzincPort=$ZINC_PORT 2>&1 > \
../binary-release-$NAME.log

# Get maven home set by MVN
MVN_HOME=`$MVN -version 2>&1 | grep 'Maven home' | awk '{print $NF}'`

./make-distribution.sh --name $NAME --mvn $MVN_HOME/bin/mvn --tgz $FLAGS \
-DzincPort=$ZINC_PORT 2>&1 > ../binary-release-$NAME.log
cd ..
cp spark-$SPARK_VERSION-bin-$NAME/spark-$SPARK_VERSION-bin-$NAME.tgz .

Expand Down
5 changes: 5 additions & 0 deletions docs/building-spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ can be set to control the SBT build. For example:

build/sbt -Pyarn -Phadoop-2.3 assembly

To avoid the overhead of launching sbt each time you need to re-compile, you can launch sbt
in interactive mode by running `build/sbt`, and then run all build commands at the command
prompt. For more recommendations on reducing build time, refer to the
[wiki page](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-ReducingBuildTimes).

# Testing with SBT

Some of the tests require Spark to be packaged first, so always run `build/sbt assembly` the first time. The following is an example of a correct (build, test) sequence:
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/mllib/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ def update(rdd):
self._model = LogisticRegressionWithSGD.train(
rdd, self.numIterations, self.stepSize,
self.miniBatchFraction, self._model.weights,
regParam=self.regParam, convergenceTol=self.convergenceTol)
regParam=self.regParam)

dstream.foreachRDD(update)

Expand Down
3 changes: 3 additions & 0 deletions python/pyspark/mllib/linalg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,9 @@ def __getitem__(self, index):
raise ValueError("Index %d out of bounds." % index)

insert_index = np.searchsorted(inds, index)
if insert_index >= inds.size:
return 0.

row_ind = inds[insert_index]
if row_ind == index:
return vals[insert_index]
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/mllib/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ def update(rdd):
self._model = LinearRegressionWithSGD.train(
rdd, self.numIterations, self.stepSize,
self.miniBatchFraction, self._model.weights,
intercept=self._model.intercept, convergenceTol=self.convergenceTol)
intercept=self._model.intercept)

dstream.foreachRDD(update)

Expand Down
20 changes: 11 additions & 9 deletions python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,17 @@ def test_conversion(self):
self.assertTrue(dv.array.dtype == 'float64')

def test_sparse_vector_indexing(self):
sv = SparseVector(4, {1: 1, 3: 2})
self.assertEquals(sv[0], 0.)
self.assertEquals(sv[3], 2.)
self.assertEquals(sv[1], 1.)
self.assertEquals(sv[2], 0.)
self.assertEquals(sv[-1], 2)
self.assertEquals(sv[-2], 0)
self.assertEquals(sv[-4], 0)
for ind in [4, -5]:
sv = SparseVector(5, {1: 1, 3: 2})
self.assertEqual(sv[0], 0.)
self.assertEqual(sv[3], 2.)
self.assertEqual(sv[1], 1.)
self.assertEqual(sv[2], 0.)
self.assertEqual(sv[4], 0.)
self.assertEqual(sv[-1], 0.)
self.assertEqual(sv[-2], 2.)
self.assertEqual(sv[-3], 0.)
self.assertEqual(sv[-5], 0.)
for ind in [5, -6]:
self.assertRaises(ValueError, sv.__getitem__, ind)
for ind in [7.8, '1']:
self.assertRaises(TypeError, sv.__getitem__, ind)
Expand Down
9 changes: 9 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from pyspark.sql import since
from pyspark.sql.types import StringType
from pyspark.sql.column import Column, _to_java_column, _to_seq
from pyspark.sql.dataframe import DataFrame


def _create_function(name, doc=""):
Expand Down Expand Up @@ -190,6 +191,14 @@ def approxCountDistinct(col, rsd=None):
return Column(jc)


@since(1.6)
def broadcast(df):
"""Marks a DataFrame as small enough for use in broadcast joins."""

sc = SparkContext._active_spark_context
return DataFrame(sc._jvm.functions.broadcast(df._jdf), df.sql_ctx)


@since(1.4)
def coalesce(*cols):
"""Returns the first column that is not null.
Expand Down
27 changes: 27 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,33 @@ def test_with_column_with_existing_name(self):
keys = self.df.withColumn("key", self.df.key).select("key").collect()
self.assertEqual([r.key for r in keys], list(range(100)))

# regression test for SPARK-10417
def test_column_iterator(self):

def foo():
for x in self.df.key:
break

self.assertRaises(TypeError, foo)

# add test for SPARK-10577 (test broadcast join hint)
def test_functions_broadcast(self):
from pyspark.sql.functions import broadcast

df1 = self.sqlCtx.createDataFrame([(1, "1"), (2, "2")], ("key", "value"))
df2 = self.sqlCtx.createDataFrame([(1, "1"), (2, "2")], ("key", "value"))

# equijoin - should be converted into broadcast join
plan1 = df1.join(broadcast(df2), "key")._jdf.queryExecution().executedPlan()
self.assertEqual(1, plan1.toString().count("BroadcastHashJoin"))

# no join key -- should not be a broadcast join
plan2 = df1.join(broadcast(df2))._jdf.queryExecution().executedPlan()
self.assertEqual(0, plan2.toString().count("BroadcastHashJoin"))

# planner should not crash without a join
broadcast(df1)._jdf.queryExecution().executedPlan()


class HiveContextSQLTests(ReusedPySparkTestCase):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,10 @@ class Analyzer(
val withName = Alias(agg, s"_w${extractedExprBuffer.length}")()
extractedExprBuffer += withName
withName.toAttribute

// Extracts other attributes
case attr: Attribute => extractExpr(attr)

}.asInstanceOf[NamedExpression]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1625,7 +1625,7 @@ class DataFrame private[sql](
*/
@deprecated("Use write.jdbc()", "1.4.0")
def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit = {
val w = if (overwrite) write.mode(SaveMode.Overwrite) else write
val w = if (overwrite) write.mode(SaveMode.Overwrite) else write.mode(SaveMode.Append)
w.jdbc(url, table, new Properties)
}

Expand Down
32 changes: 13 additions & 19 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,10 @@ case class Window(
// Construct the ordering. This is used to compare the result of current value projection
// to the result of bound value projection. This is done manually because we want to use
// Code Generation (if it is enabled).
val (sortExprs, schema) = exprs.map { case e =>
// This AttributeReference does not need to have unique IDs, it's OK to be called
// in executor.
val ref = AttributeReference("ordExpr", e.dataType, e.nullable)()
(SortOrder(ref, e.direction), ref)
}.unzip
val ordering = newOrdering(sortExprs, schema)
val sortExprs = exprs.zipWithIndex.map { case (e, i) =>
SortOrder(BoundReference(i, e.dataType, e.nullable), e.direction)
}
val ordering = newOrdering(sortExprs, Nil)
RangeBoundOrdering(ordering, current, bound)
case RowFrame => RowBoundOrdering(offset)
}
Expand Down Expand Up @@ -203,17 +200,19 @@ case class Window(
* This method uses Code Generation. It can only be used on the executor side.
*
* @param expressions unbound ordered function expressions.
* @param attributes output attributes
* @return the final resulting projection.
*/
private[this] def createResultProjection(
expressions: Seq[Expression],
attributes: Seq[Attribute]): MutableProjection = {
val unboundToAttrMap = expressions.zip(attributes).toMap
val patchedWindowExpression = windowExpression.map(_.transform(unboundToAttrMap))
expressions: Seq[Expression]): MutableProjection = {
val references = expressions.zipWithIndex.map{ case (e, i) =>
// Results of window expressions will be on the right side of child's output
BoundReference(child.output.size + i, e.dataType, e.nullable)
}
val unboundToRefMap = expressions.zip(references).toMap
val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap))
newMutableProjection(
projectList ++ patchedWindowExpression,
child.output ++ attributes)()
child.output)()
}

protected override def doExecute(): RDD[InternalRow] = {
Expand Down Expand Up @@ -248,17 +247,12 @@ case class Window(
factories(index) = () => createFrameProcessor(frame, functions, ordinal)
}

// AttributeReference can only be created in driver, or the id will not be unique
val outputAttributes = unboundExpressions.map {
e => AttributeReference("windowResult", e.dataType, e.nullable)()
}

// Start processing.
child.execute().mapPartitions { stream =>
new Iterator[InternalRow] {

// Get all relevant projections.
val result = createResultProjection(unboundExpressions, outputAttributes)
val result = createResultProjection(unboundExpressions)
val grouping = if (child.outputsUnsafeRows) {
UnsafeProjection.create(partitionSpec, child.output)
} else {
Expand Down
10 changes: 10 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1756,4 +1756,14 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
df1.withColumn("diff", lit(0)))
}
}

test("SPARK-10389: order by non-attribute grouping expression on Aggregate") {
withTempTable("src") {
Seq((1, 1), (-1, 1)).toDF("key", "value").registerTempTable("src")
checkAnswer(sql("SELECT MAX(value) FROM src GROUP BY key + 1 ORDER BY key + 1"),
Seq(Row(1), Row(1)))
checkAnswer(sql("SELECT MAX(value) FROM src GROUP BY key + 1 ORDER BY (key + 1) * 2"),
Seq(Row(1), Row(1)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ object HiveThriftServer2 extends Logging {
@DeveloperApi
def startWithContext(sqlContext: HiveContext): Unit = {
val server = new HiveThriftServer2(sqlContext)
sqlContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
server.init(sqlContext.hiveconf)
server.start()
listener = new HiveThriftServer2Listener(server, sqlContext.conf)
Expand Down
Loading