diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index ed304c2a481bf..ffb6dbd702cc2 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1352,9 +1352,10 @@ setMethod("where", #' @param x A Spark DataFrame #' @param y A Spark DataFrame #' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a -#' Column expression. If joinExpr is omitted, join() wil perform a Cartesian join +#' Column expression. If joinExpr is omitted, join() will perform a Cartesian join #' @param joinType The type of join to perform. The following join types are available: -#' 'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'. The default joinType is "inner". +#' 'inner', 'outer', 'full', 'fullouter', leftouter', 'left_outer', 'left', +#' 'right_outer', 'rightouter', 'right', and 'leftsemi'. The default joinType is "inner". #' @return A DataFrame containing the result of the join operation. #' @rdname join #' @name join @@ -1379,11 +1380,15 @@ setMethod("join", if (is.null(joinType)) { sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc) } else { - if (joinType %in% c("inner", "outer", "left_outer", "right_outer", "semijoin")) { + if (joinType %in% c("inner", "outer", "full", "fullouter", + "leftouter", "left_outer", "left", + "rightouter", "right_outer", "right", "leftsemi")) { + joinType <- gsub("_", "", joinType) sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc, joinType) } else { stop("joinType must be one of the following types: ", - "'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'") + "'inner', 'outer', 'full', 'fullouter', 'leftouter', 'left_outer', 'left', + 'rightouter', 'right_outer', 'right', 'leftsemi'") } } } diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index aedc385c4ff4d..63513d9f380c1 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -952,7 +952,7 @@ test_that("join() and merge() on a DataFrame", { expect_equal(names(joined2), c("age", "name", "name", "test")) expect_equal(count(joined2), 3) - joined3 <- join(df, df2, df$name == df2$name, "right_outer") + joined3 <- join(df, df2, df$name == df2$name, "rightouter") expect_equal(names(joined3), c("age", "name", "name", "test")) expect_equal(count(joined3), 4) expect_true(is.na(collect(orderBy(joined3, joined3$age))$age[2])) @@ -963,11 +963,34 @@ test_that("join() and merge() on a DataFrame", { expect_equal(count(joined4), 4) expect_equal(collect(orderBy(joined4, joined4$name))$newAge[3], 24) + joined5 <- join(df, df2, df$name == df2$name, "leftouter") + expect_equal(names(joined5), c("age", "name", "name", "test")) + expect_equal(count(joined5), 3) + expect_true(is.na(collect(orderBy(joined5, joined5$age))$age[1])) + + joined6 <- join(df, df2, df$name == df2$name, "inner") + expect_equal(names(joined6), c("age", "name", "name", "test")) + expect_equal(count(joined6), 3) + + joined7 <- join(df, df2, df$name == df2$name, "leftsemi") + expect_equal(names(joined7), c("age", "name")) + expect_equal(count(joined7), 3) + + joined8 <- join(df, df2, df$name == df2$name, "left_outer") + expect_equal(names(joined8), c("age", "name", "name", "test")) + expect_equal(count(joined8), 3) + expect_true(is.na(collect(orderBy(joined8, joined8$age))$age[1])) + + joined9 <- join(df, df2, df$name == df2$name, "right_outer") + expect_equal(names(joined9), c("age", "name", "name", "test")) + expect_equal(count(joined9), 4) + expect_true(is.na(collect(orderBy(joined9, joined9$age))$age[2])) + merged <- select(merge(df, df2, df$name == df2$name, "outer"), alias(df$age + 5, "newAge"), df$name, df2$test) expect_equal(names(merged), c("newAge", "name", "test")) expect_equal(count(merged), 4) - expect_equal(collect(orderBy(merged, joined4$name))$newAge[3], 24) + expect_equal(collect(orderBy(merged, merged$name))$newAge[3], 24) }) test_that("toJSON() returns an RDD of the correct values", { diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 21dc8f0b65485..68a9f912a5d2c 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -31,6 +31,7 @@ import org.apache.spark.ui.scope.RDDOperationGraph private[spark] object UIUtils extends Logging { val TABLE_CLASS_NOT_STRIPED = "table table-bordered table-condensed" val TABLE_CLASS_STRIPED = TABLE_CLASS_NOT_STRIPED + " table-striped" + val TABLE_CLASS_STRIPED_SORTABLE = TABLE_CLASS_STRIPED + " sortable" // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use. private val dateFormat = new ThreadLocal[SimpleDateFormat]() { diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 01cddda4c62cd..1a29b0f412603 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -62,7 +62,7 @@ private[ui] class ExecutorsPage( val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty val execTable = - +
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index d5cdbfac104f8..be144f6065baa 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -50,7 +50,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage hasBytesSpilled = data.hasBytesSpilled }) -
Executor ID Address
+
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 8313312226713..2dd2c036c9307 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1751,6 +1751,13 @@ private[spark] object Utils extends Logging { if (uri.getScheme() != null) { return uri } + // make sure to handle if the path has a fragment (applies to yarn + // distributed cache) + if (uri.getFragment() != null) { + val absoluteURI = new File(uri.getPath()).getAbsoluteFile().toURI() + return new URI(absoluteURI.getScheme(), absoluteURI.getHost(), absoluteURI.getPath(), + uri.getFragment()) + } } catch { case e: URISyntaxException => } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 1fb81ad565b41..68b0da76bc134 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -384,7 +384,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assertResolves("hdfs:/root/spark.jar", "hdfs:/root/spark.jar") assertResolves("hdfs:///root/spark.jar#app.jar", "hdfs:/root/spark.jar#app.jar") assertResolves("spark.jar", s"file:$cwd/spark.jar") - assertResolves("spark.jar#app.jar", s"file:$cwd/spark.jar%23app.jar") + assertResolves("spark.jar#app.jar", s"file:$cwd/spark.jar#app.jar") assertResolves("path to/file.txt", s"file:$cwd/path%20to/file.txt") if (Utils.isWindows) { assertResolves("C:\\path\\to\\file.txt", "file:/C:/path/to/file.txt") @@ -414,10 +414,10 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assertResolves("file:/jar1,file:/jar2", "file:/jar1,file:/jar2") assertResolves("hdfs:/jar1,file:/jar2,jar3", s"hdfs:/jar1,file:/jar2,file:$cwd/jar3") assertResolves("hdfs:/jar1,file:/jar2,jar3,jar4#jar5,path to/jar6", - s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4%23jar5,file:$cwd/path%20to/jar6") + s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5,file:$cwd/path%20to/jar6") if (Utils.isWindows) { assertResolves("""hdfs:/jar1,file:/jar2,jar3,C:\pi.py#py.pi,C:\path to\jar4""", - s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py%23py.pi,file:/C:/path%20to/jar4") + s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi,file:/C:/path%20to/jar4") } } diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index 9dac43ce54425..cb79e9eba06e2 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -70,7 +70,7 @@ GIT_REF=${GIT_REF:-master} # Destination directory parent on remote server REMOTE_PARENT_DIR=${REMOTE_PARENT_DIR:-/home/$ASF_USERNAME/public_html} -SSH="ssh -o StrictHostKeyChecking=no -i $ASF_RSA_KEY" +SSH="ssh -o ConnectTimeout=300 -o StrictHostKeyChecking=no -i $ASF_RSA_KEY" GPG="gpg --no-tty --batch" NEXUS_ROOT=https://repository.apache.org/service/local/staging NEXUS_PROFILE=d63f592e7eac0 # Profile for Spark staging uploads @@ -141,8 +141,12 @@ if [[ "$1" == "package" ]]; then export ZINC_PORT=$ZINC_PORT echo "Creating distribution: $NAME ($FLAGS)" - ./make-distribution.sh --name $NAME --tgz $FLAGS -DzincPort=$ZINC_PORT 2>&1 > \ - ../binary-release-$NAME.log + + # Get maven home set by MVN + MVN_HOME=`$MVN -version 2>&1 | grep 'Maven home' | awk '{print $NF}'` + + ./make-distribution.sh --name $NAME --mvn $MVN_HOME/bin/mvn --tgz $FLAGS \ + -DzincPort=$ZINC_PORT 2>&1 > ../binary-release-$NAME.log cd .. cp spark-$SPARK_VERSION-bin-$NAME/spark-$SPARK_VERSION-bin-$NAME.tgz . diff --git a/docs/building-spark.md b/docs/building-spark.md index 4db32cfd628bc..de9729f0c00a5 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -205,6 +205,11 @@ can be set to control the SBT build. For example: build/sbt -Pyarn -Phadoop-2.3 assembly +To avoid the overhead of launching sbt each time you need to re-compile, you can launch sbt +in interactive mode by running `build/sbt`, and then run all build commands at the command +prompt. For more recommendations on reducing build time, refer to the +[wiki page](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-ReducingBuildTimes). + # Testing with SBT Some of the tests require Spark to be packaged first, so always run `build/sbt assembly` the first time. The following is an example of a correct (build, test) sequence: diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index a302ed8ec2435..e4500a0bc8315 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -633,7 +633,7 @@ def update(rdd): self._model = LogisticRegressionWithSGD.train( rdd, self.numIterations, self.stepSize, self.miniBatchFraction, self._model.weights, - regParam=self.regParam, convergenceTol=self.convergenceTol) + regParam=self.regParam) dstream.foreachRDD(update) diff --git a/python/pyspark/mllib/linalg/__init__.py b/python/pyspark/mllib/linalg/__init__.py index 30d05d0ec3609..2db2dec275bd9 100644 --- a/python/pyspark/mllib/linalg/__init__.py +++ b/python/pyspark/mllib/linalg/__init__.py @@ -732,6 +732,9 @@ def __getitem__(self, index): raise ValueError("Index %d out of bounds." % index) insert_index = np.searchsorted(inds, index) + if insert_index >= inds.size: + return 0. + row_ind = inds[insert_index] if row_ind == index: return vals[insert_index] diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 562fb36158043..6bbac026507c2 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -669,7 +669,7 @@ def update(rdd): self._model = LinearRegressionWithSGD.train( rdd, self.numIterations, self.stepSize, self.miniBatchFraction, self._model.weights, - intercept=self._model.intercept, convergenceTol=self.convergenceTol) + intercept=self._model.intercept) dstream.foreachRDD(update) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 5097c5e8ba4cd..4d471443d192c 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -205,15 +205,17 @@ def test_conversion(self): self.assertTrue(dv.array.dtype == 'float64') def test_sparse_vector_indexing(self): - sv = SparseVector(4, {1: 1, 3: 2}) - self.assertEquals(sv[0], 0.) - self.assertEquals(sv[3], 2.) - self.assertEquals(sv[1], 1.) - self.assertEquals(sv[2], 0.) - self.assertEquals(sv[-1], 2) - self.assertEquals(sv[-2], 0) - self.assertEquals(sv[-4], 0) - for ind in [4, -5]: + sv = SparseVector(5, {1: 1, 3: 2}) + self.assertEqual(sv[0], 0.) + self.assertEqual(sv[3], 2.) + self.assertEqual(sv[1], 1.) + self.assertEqual(sv[2], 0.) + self.assertEqual(sv[4], 0.) + self.assertEqual(sv[-1], 0.) + self.assertEqual(sv[-2], 2.) + self.assertEqual(sv[-3], 0.) + self.assertEqual(sv[-5], 0.) + for ind in [5, -6]: self.assertRaises(ValueError, sv.__getitem__, ind) for ind in [7.8, '1']: self.assertRaises(TypeError, sv.__getitem__, ind) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 4b74a501521a5..3c631a0328dee 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -30,6 +30,7 @@ from pyspark.sql import since from pyspark.sql.types import StringType from pyspark.sql.column import Column, _to_java_column, _to_seq +from pyspark.sql.dataframe import DataFrame def _create_function(name, doc=""): @@ -190,6 +191,14 @@ def approxCountDistinct(col, rsd=None): return Column(jc) +@since(1.6) +def broadcast(df): + """Marks a DataFrame as small enough for use in broadcast joins.""" + + sc = SparkContext._active_spark_context + return DataFrame(sc._jvm.functions.broadcast(df._jdf), df.sql_ctx) + + @since(1.4) def coalesce(*cols): """Returns the first column that is not null. diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 6b647f3aacfa1..14414b3d566f8 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1056,6 +1056,33 @@ def test_with_column_with_existing_name(self): keys = self.df.withColumn("key", self.df.key).select("key").collect() self.assertEqual([r.key for r in keys], list(range(100))) + # regression test for SPARK-10417 + def test_column_iterator(self): + + def foo(): + for x in self.df.key: + break + + self.assertRaises(TypeError, foo) + + # add test for SPARK-10577 (test broadcast join hint) + def test_functions_broadcast(self): + from pyspark.sql.functions import broadcast + + df1 = self.sqlCtx.createDataFrame([(1, "1"), (2, "2")], ("key", "value")) + df2 = self.sqlCtx.createDataFrame([(1, "1"), (2, "2")], ("key", "value")) + + # equijoin - should be converted into broadcast join + plan1 = df1.join(broadcast(df2), "key")._jdf.queryExecution().executedPlan() + self.assertEqual(1, plan1.toString().count("BroadcastHashJoin")) + + # no join key -- should not be a broadcast join + plan2 = df1.join(broadcast(df2))._jdf.queryExecution().executedPlan() + self.assertEqual(0, plan2.toString().count("BroadcastHashJoin")) + + # planner should not crash without a join + broadcast(df1)._jdf.queryExecution().executedPlan() + class HiveContextSQLTests(ReusedPySparkTestCase): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index aacfbf9af760c..47db44880f531 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -825,6 +825,10 @@ class Analyzer( val withName = Alias(agg, s"_w${extractedExprBuffer.length}")() extractedExprBuffer += withName withName.toAttribute + + // Extracts other attributes + case attr: Attribute => extractExpr(attr) + }.asInstanceOf[NamedExpression] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index f024db7ae2cdd..3fce47fa655d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1625,7 +1625,7 @@ class DataFrame private[sql]( */ @deprecated("Use write.jdbc()", "1.4.0") def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit = { - val w = if (overwrite) write.mode(SaveMode.Overwrite) else write + val w = if (overwrite) write.mode(SaveMode.Overwrite) else write.mode(SaveMode.Append) w.jdbc(url, table, new Properties) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index 7e204ec88aa1f..55035f4bc5f2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -145,13 +145,10 @@ case class Window( // Construct the ordering. This is used to compare the result of current value projection // to the result of bound value projection. This is done manually because we want to use // Code Generation (if it is enabled). - val (sortExprs, schema) = exprs.map { case e => - // This AttributeReference does not need to have unique IDs, it's OK to be called - // in executor. - val ref = AttributeReference("ordExpr", e.dataType, e.nullable)() - (SortOrder(ref, e.direction), ref) - }.unzip - val ordering = newOrdering(sortExprs, schema) + val sortExprs = exprs.zipWithIndex.map { case (e, i) => + SortOrder(BoundReference(i, e.dataType, e.nullable), e.direction) + } + val ordering = newOrdering(sortExprs, Nil) RangeBoundOrdering(ordering, current, bound) case RowFrame => RowBoundOrdering(offset) } @@ -203,17 +200,19 @@ case class Window( * This method uses Code Generation. It can only be used on the executor side. * * @param expressions unbound ordered function expressions. - * @param attributes output attributes * @return the final resulting projection. */ private[this] def createResultProjection( - expressions: Seq[Expression], - attributes: Seq[Attribute]): MutableProjection = { - val unboundToAttrMap = expressions.zip(attributes).toMap - val patchedWindowExpression = windowExpression.map(_.transform(unboundToAttrMap)) + expressions: Seq[Expression]): MutableProjection = { + val references = expressions.zipWithIndex.map{ case (e, i) => + // Results of window expressions will be on the right side of child's output + BoundReference(child.output.size + i, e.dataType, e.nullable) + } + val unboundToRefMap = expressions.zip(references).toMap + val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap)) newMutableProjection( projectList ++ patchedWindowExpression, - child.output ++ attributes)() + child.output)() } protected override def doExecute(): RDD[InternalRow] = { @@ -248,17 +247,12 @@ case class Window( factories(index) = () => createFrameProcessor(frame, functions, ordinal) } - // AttributeReference can only be created in driver, or the id will not be unique - val outputAttributes = unboundExpressions.map { - e => AttributeReference("windowResult", e.dataType, e.nullable)() - } - // Start processing. child.execute().mapPartitions { stream => new Iterator[InternalRow] { // Get all relevant projections. - val result = createResultProjection(unboundExpressions, outputAttributes) + val result = createResultProjection(unboundExpressions) val grouping = if (child.outputsUnsafeRows) { UnsafeProjection.create(partitionSpec, child.output) } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index eb5bffc18f846..223a2fbfbe020 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1756,4 +1756,14 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { df1.withColumn("diff", lit(0))) } } + + test("SPARK-10389: order by non-attribute grouping expression on Aggregate") { + withTempTable("src") { + Seq((1, 1), (-1, 1)).toDF("key", "value").registerTempTable("src") + checkAnswer(sql("SELECT MAX(value) FROM src GROUP BY key + 1 ORDER BY key + 1"), + Seq(Row(1), Row(1))) + checkAnswer(sql("SELECT MAX(value) FROM src GROUP BY key + 1 ORDER BY (key + 1) * 2"), + Seq(Row(1), Row(1))) + } + } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index a0643cec0fb7c..a4fd0c3ce9702 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -55,7 +55,6 @@ object HiveThriftServer2 extends Logging { @DeveloperApi def startWithContext(sqlContext: HiveContext): Unit = { val server = new HiveThriftServer2(sqlContext) - sqlContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion) server.init(sqlContext.hiveconf) server.start() listener = new HiveThriftServer2Listener(server, sqlContext.conf) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index b72249b3bf8c0..19b2f24456ab0 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -21,6 +21,7 @@ import java.io.File import java.net.URL import java.sql.{Date, DriverManager, SQLException, Statement} +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ @@ -431,6 +432,32 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } ) } + + test("Checks Hive version via SET -v") { + withJdbcStatement { statement => + val resultSet = statement.executeQuery("SET -v") + + val conf = mutable.Map.empty[String, String] + while (resultSet.next()) { + conf += resultSet.getString(1) -> resultSet.getString(2) + } + + assert(conf.get("spark.sql.hive.version") === Some("1.2.1")) + } + } + + test("Checks Hive version via SET") { + withJdbcStatement { statement => + val resultSet = statement.executeQuery("SET") + + val conf = mutable.Map.empty[String, String] + while (resultSet.next()) { + conf += resultSet.getString(1) -> resultSet.getString(2) + } + + assert(conf.get("spark.sql.hive.version") === Some("1.2.1")) + } + } } class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 7af84057cee3b..8a6236ce018ff 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -648,6 +648,11 @@ private[hive] object HiveContext { doc = "Version of the Hive metastore. Available options are " + s"0.12.0 through $hiveExecutionVersion.") + val HIVE_EXECUTION_VERSION = stringConf( + key = "spark.sql.hive.version", + defaultValue = Some(hiveExecutionVersion), + doc = "Version of Hive used internally by Spark SQL.") + val HIVE_METASTORE_JARS = stringConf("spark.sql.hive.metastore.jars", defaultValue = Some("builtin"), doc = s""" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 1d5ee22e99e0a..788118b6e3082 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -29,7 +29,8 @@ import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ import org.apache.spark._ -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{SQLContext, QueryTest} +import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext} import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.sql.types.DecimalType @@ -107,6 +108,16 @@ class HiveSparkSubmitSuite runSparkSubmit(args) } + test("SPARK-11009 fix wrong result of Window function in cluster mode") { + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val args = Seq( + "--class", SPARK_11009.getClass.getName.stripSuffix("$"), + "--name", "SparkSQLConfTest", + "--master", "local-cluster[2,1,1024]", + unusedJar.toString) + runSparkSubmit(args) + } + // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. // This is copied from org.apache.spark.deploy.SparkSubmitSuite private def runSparkSubmit(args: Seq[String]): Unit = { @@ -314,3 +325,33 @@ object SPARK_9757 extends QueryTest with Logging { } } } + +object SPARK_11009 extends QueryTest { + import org.apache.spark.sql.functions._ + + protected var sqlContext: SQLContext = _ + + def main(args: Array[String]): Unit = { + Utils.configTestLog4j("INFO") + + val sparkContext = new SparkContext( + new SparkConf() + .set("spark.ui.enabled", "false") + .set("spark.sql.shuffle.partitions", "100")) + + val hiveContext = new TestHiveContext(sparkContext) + sqlContext = hiveContext + + try { + val df = sqlContext.range(1 << 20) + val df2 = df.select((df("id") % 1000).alias("A"), (df("id") / 1000).alias("B")) + val ws = Window.partitionBy(df2("A")).orderBy(df2("B")) + val df3 = df2.select(df2("A"), df2("B"), rowNumber().over(ws).alias("rn")).filter("rn < 0") + if (df3.rdd.count() != 0) { + throw new Exception("df3 should have 0 output row.") + } + } finally { + sparkContext.stop() + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 72038f70df070..c98fddb3cb47b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -835,6 +835,33 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils { ).map(i => Row(i._1, i._2, i._3))) } + test("window function: refer column in inner select block") { + val data = Seq( + WindowData(1, "a", 5), + WindowData(2, "a", 6), + WindowData(3, "b", 7), + WindowData(4, "b", 8), + WindowData(5, "c", 9), + WindowData(6, "c", 10) + ) + sparkContext.parallelize(data).toDF().registerTempTable("windowData") + + checkAnswer( + sql( + """ + |select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1 + |from (select month, area, product, 1 as tmp1 from windowData) tmp + """.stripMargin), + Seq( + ("a", 2), + ("a", 3), + ("b", 2), + ("b", 3), + ("c", 2), + ("c", 3) + ).map(i => Row(i._1, i._2))) + } + test("window function: partition and order expressions") { val data = Seq( WindowData(1, "a", 5), diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala index 1b717b64542d5..a19b85a51d289 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala @@ -443,7 +443,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { } def generateInputMetadataTable(inputMetadatas: Seq[(Int, String)]): Seq[Node] = { -
Executor ID Address
+
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 28228c2656a5c..f21f5efbfb196 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -318,7 +318,8 @@ private[spark] class Client( destName: Option[String] = None, targetDir: Option[String] = None, appMasterOnly: Boolean = false): (Boolean, String) = { - val localURI = new URI(path.trim()) + val trimmedPath = path.trim() + val localURI = Utils.resolveURI(trimmedPath) if (localURI.getScheme != LOCAL_SCHEME) { if (addDistributedUri(localURI)) { val localPath = getQualifiedLocalPath(localURI, hadoopConf) @@ -334,7 +335,7 @@ private[spark] class Client( (false, null) } } else { - (true, path.trim()) + (true, trimmedPath) } } @@ -555,10 +556,10 @@ private[spark] class Client( LOCALIZED_PYTHON_DIR) } (pySparkArchives ++ pyArchives).foreach { path => - val uri = new URI(path) + val uri = Utils.resolveURI(path) if (uri.getScheme != LOCAL_SCHEME) { pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), - new Path(path).getName()) + new Path(uri).getName()) } else { pythonPath += uri.getPath() } @@ -1143,7 +1144,7 @@ object Client extends Logging { } else { getMainJarUri(sparkConf.getOption(CONF_SPARK_USER_JAR)) } - mainJar.foreach(addFileToClasspath(sparkConf, _, APP_JAR, env)) + mainJar.foreach(addFileToClasspath(sparkConf, conf, _, APP_JAR, env)) val secondaryJars = if (args != null) { @@ -1152,10 +1153,10 @@ object Client extends Logging { getSecondaryJarUris(sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS)) } secondaryJars.foreach { x => - addFileToClasspath(sparkConf, x, null, env) + addFileToClasspath(sparkConf, conf, x, null, env) } } - addFileToClasspath(sparkConf, new URI(sparkJar(sparkConf)), SPARK_JAR, env) + addFileToClasspath(sparkConf, conf, new URI(sparkJar(sparkConf)), SPARK_JAR, env) populateHadoopClasspath(conf, env) sys.env.get(ENV_DIST_CLASSPATH).foreach { cp => addClasspathEntry(getClusterPath(sparkConf, cp), env) @@ -1175,7 +1176,7 @@ object Client extends Logging { private def getMainJarUri(mainJar: Option[String]): Option[URI] = { mainJar.flatMap { path => - val uri = new URI(path) + val uri = Utils.resolveURI(path) if (uri.getScheme == LOCAL_SCHEME) Some(uri) else None }.orElse(Some(new URI(APP_JAR))) } @@ -1190,15 +1191,17 @@ object Client extends Logging { * If an alternate name for the file is given, and it's not a "local:" file, the alternate * name will be added to the classpath (relative to the job's work directory). * - * If not a "local:" file and no alternate name, the environment is not modified. + * If not a "local:" file and no alternate name, the linkName will be added to the classpath. * - * @param conf Spark configuration. - * @param uri URI to add to classpath (optional). - * @param fileName Alternate name for the file (optional). - * @param env Map holding the environment variables. + * @param conf Spark configuration. + * @param hadoopConf Hadoop configuration. + * @param uri URI to add to classpath (optional). + * @param fileName Alternate name for the file (optional). + * @param env Map holding the environment variables. */ private def addFileToClasspath( conf: SparkConf, + hadoopConf: Configuration, uri: URI, fileName: String, env: HashMap[String, String]): Unit = { @@ -1207,6 +1210,11 @@ object Client extends Logging { } else if (fileName != null) { addClasspathEntry(buildPath( YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env) + } else if (uri != null) { + val localPath = getQualifiedLocalPath(uri, hadoopConf) + val linkName = Option(uri.getFragment()).getOrElse(localPath.getName()) + addClasspathEntry(buildPath( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), linkName), env) } }
Input