From 156ac279014872de976246bb252dd28df5ab29d6 Mon Sep 17 00:00:00 2001 From: Tom Graves Date: Fri, 9 Oct 2015 14:06:25 -0700 Subject: [PATCH 01/17] [SPARK-10858] YARN: archives/jar/files rename with # doesn't work unl https://issues.apache.org/jira/browse/SPARK-10858 The issue here is that in resolveURI we default to calling new File(path).getAbsoluteFile().toURI(). But if the path passed in already has a # in it then File(path) will think that is supposed to be part of the actual file path and not a fragment so it changes # to %23. Then when we try to parse that later in Client as a URI it doesn't recognize there is a fragment. so to fix we just check if there is a fragment, still create the File like we did before and then add the fragment back on. Author: Tom Graves Closes #9035 from tgravescs/SPARK-10858. (cherry picked from commit 63c340a710b24869410d56602b712fbfe443e6f0) --- core/src/main/scala/org/apache/spark/util/Utils.scala | 7 +++++++ core/src/test/scala/org/apache/spark/util/UtilsSuite.scala | 6 +++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 8313312226713..2dd2c036c9307 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1751,6 +1751,13 @@ private[spark] object Utils extends Logging { if (uri.getScheme() != null) { return uri } + // make sure to handle if the path has a fragment (applies to yarn + // distributed cache) + if (uri.getFragment() != null) { + val absoluteURI = new File(uri.getPath()).getAbsoluteFile().toURI() + return new URI(absoluteURI.getScheme(), absoluteURI.getHost(), absoluteURI.getPath(), + uri.getFragment()) + } } catch { case e: URISyntaxException => } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 1fb81ad565b41..68b0da76bc134 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -384,7 +384,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assertResolves("hdfs:/root/spark.jar", "hdfs:/root/spark.jar") assertResolves("hdfs:///root/spark.jar#app.jar", "hdfs:/root/spark.jar#app.jar") assertResolves("spark.jar", s"file:$cwd/spark.jar") - assertResolves("spark.jar#app.jar", s"file:$cwd/spark.jar%23app.jar") + assertResolves("spark.jar#app.jar", s"file:$cwd/spark.jar#app.jar") assertResolves("path to/file.txt", s"file:$cwd/path%20to/file.txt") if (Utils.isWindows) { assertResolves("C:\\path\\to\\file.txt", "file:/C:/path/to/file.txt") @@ -414,10 +414,10 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assertResolves("file:/jar1,file:/jar2", "file:/jar1,file:/jar2") assertResolves("hdfs:/jar1,file:/jar2,jar3", s"hdfs:/jar1,file:/jar2,file:$cwd/jar3") assertResolves("hdfs:/jar1,file:/jar2,jar3,jar4#jar5,path to/jar6", - s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4%23jar5,file:$cwd/path%20to/jar6") + s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5,file:$cwd/path%20to/jar6") if (Utils.isWindows) { assertResolves("""hdfs:/jar1,file:/jar2,jar3,C:\pi.py#py.pi,C:\path to\jar4""", - s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py%23py.pi,file:/C:/path%20to/jar4") + s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi,file:/C:/path%20to/jar4") } } From 6dc23e626ba82102beaa3b6550c5ef63777b5df9 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 12 Oct 2015 09:16:14 -0700 Subject: [PATCH 02/17] [SPARK-10960] [SQL] SQL with windowing function should be able to refer column in inner select JIRA: https://issues.apache.org/jira/browse/SPARK-10960 When accessing a column in inner select from a select with window function, `AnalysisException` will be thrown. For example, an query like this: select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1 from (select month, area, product, 1 as tmp1 from windowData) tmp Currently, the rule `ExtractWindowExpressions` in `Analyzer` only extracts regular expressions from `WindowFunction`, `WindowSpecDefinition` and `AggregateExpression`. We need to also extract other attributes as the one in `Alias` as shown in the above query. Author: Liang-Chi Hsieh Closes #9011 from viirya/fix-window-inner-column. (cherry picked from commit fcb37a04177edc2376e39dd0b910f0268f7c72ec) Signed-off-by: Yin Huai --- .../sql/catalyst/analysis/Analyzer.scala | 4 +++ .../sql/hive/execution/SQLQuerySuite.scala | 27 +++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e371f639f318f..6e7353fdbdfdf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -823,6 +823,10 @@ class Analyzer( val withName = Alias(agg, s"_w${extractedExprBuffer.length}")() extractedExprBuffer += withName withName.toAttribute + + // Extracts other attributes + case attr: Attribute => extractExpr(attr) + }.asInstanceOf[NamedExpression] } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 72038f70df070..c98fddb3cb47b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -835,6 +835,33 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils { ).map(i => Row(i._1, i._2, i._3))) } + test("window function: refer column in inner select block") { + val data = Seq( + WindowData(1, "a", 5), + WindowData(2, "a", 6), + WindowData(3, "b", 7), + WindowData(4, "b", 8), + WindowData(5, "c", 9), + WindowData(6, "c", 10) + ) + sparkContext.parallelize(data).toDF().registerTempTable("windowData") + + checkAnswer( + sql( + """ + |select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1 + |from (select month, area, product, 1 as tmp1 from windowData) tmp + """.stripMargin), + Seq( + ("a", 2), + ("a", 3), + ("b", 2), + ("b", 3), + ("c", 2), + ("c", 3) + ).map(i => Row(i._1, i._2))) + } + test("window function: partition and order expressions") { val data = Seq( WindowData(1, "a", 5), From cd55fbcc2757264f92a1618ef4a8908c2c781bc6 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 12 Oct 2015 10:21:57 -0700 Subject: [PATCH 03/17] [SPARK-11023] [YARN] Avoid creating URIs from local paths directly. The issue is that local paths on Windows, when provided with drive letters or backslashes, are not valid URIs. Instead of trying to figure out whether paths are URIs or not, use Utils.resolveURI() which does that for us. Author: Marcelo Vanzin Closes #9049 from vanzin/SPARK-11023 and squashes the following commits: 77021f2 [Marcelo Vanzin] [SPARK-11023] [yarn] Avoid creating URIs from local paths directly. (cherry picked from commit 149472a01d12828c64b0a852982d48c123984182) --- .../scala/org/apache/spark/deploy/yarn/Client.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 28228c2656a5c..f2e1c2b79f516 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -318,7 +318,8 @@ private[spark] class Client( destName: Option[String] = None, targetDir: Option[String] = None, appMasterOnly: Boolean = false): (Boolean, String) = { - val localURI = new URI(path.trim()) + val trimmedPath = path.trim() + val localURI = Utils.resolveURI(trimmedPath) if (localURI.getScheme != LOCAL_SCHEME) { if (addDistributedUri(localURI)) { val localPath = getQualifiedLocalPath(localURI, hadoopConf) @@ -334,7 +335,7 @@ private[spark] class Client( (false, null) } } else { - (true, path.trim()) + (true, trimmedPath) } } @@ -555,10 +556,10 @@ private[spark] class Client( LOCALIZED_PYTHON_DIR) } (pySparkArchives ++ pyArchives).foreach { path => - val uri = new URI(path) + val uri = Utils.resolveURI(path) if (uri.getScheme != LOCAL_SCHEME) { pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), - new Path(path).getName()) + new Path(uri).getName()) } else { pythonPath += uri.getPath() } @@ -1175,7 +1176,7 @@ object Client extends Logging { private def getMainJarUri(mainJar: Option[String]): Option[URI] = { mainJar.flatMap { path => - val uri = new URI(path) + val uri = Utils.resolveURI(path) if (uri.getScheme == LOCAL_SCHEME) Some(uri) else None }.orElse(Some(new URI(APP_JAR))) } From be68a4bcbfac3dad4e0279cf6ce099cd830a4507 Mon Sep 17 00:00:00 2001 From: zero323 Date: Mon, 12 Oct 2015 12:09:06 -0700 Subject: [PATCH 04/17] [SPARK-10973] [ML] [PYTHON] Fix IndexError exception on SparseVector when asked for index after the last non-zero entry See https://github.com/apache/spark/pull/9009 for details. Author: zero323 Closes #9064 from zero323/SPARK-10973_1.5. --- python/pyspark/mllib/linalg/__init__.py | 3 +++ python/pyspark/mllib/tests.py | 20 +++++++++++--------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/python/pyspark/mllib/linalg/__init__.py b/python/pyspark/mllib/linalg/__init__.py index 30d05d0ec3609..2db2dec275bd9 100644 --- a/python/pyspark/mllib/linalg/__init__.py +++ b/python/pyspark/mllib/linalg/__init__.py @@ -732,6 +732,9 @@ def __getitem__(self, index): raise ValueError("Index %d out of bounds." % index) insert_index = np.searchsorted(inds, index) + if insert_index >= inds.size: + return 0. + row_ind = inds[insert_index] if row_ind == index: return vals[insert_index] diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 5097c5e8ba4cd..4d471443d192c 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -205,15 +205,17 @@ def test_conversion(self): self.assertTrue(dv.array.dtype == 'float64') def test_sparse_vector_indexing(self): - sv = SparseVector(4, {1: 1, 3: 2}) - self.assertEquals(sv[0], 0.) - self.assertEquals(sv[3], 2.) - self.assertEquals(sv[1], 1.) - self.assertEquals(sv[2], 0.) - self.assertEquals(sv[-1], 2) - self.assertEquals(sv[-2], 0) - self.assertEquals(sv[-4], 0) - for ind in [4, -5]: + sv = SparseVector(5, {1: 1, 3: 2}) + self.assertEqual(sv[0], 0.) + self.assertEqual(sv[3], 2.) + self.assertEqual(sv[1], 1.) + self.assertEqual(sv[2], 0.) + self.assertEqual(sv[4], 0.) + self.assertEqual(sv[-1], 0.) + self.assertEqual(sv[-2], 2.) + self.assertEqual(sv[-3], 0.) + self.assertEqual(sv[-5], 0.) + for ind in [5, -6]: self.assertRaises(ValueError, sv.__getitem__, ind) for ind in [7.8, '1']: self.assertRaises(TypeError, sv.__getitem__, ind) From ebbff391ac85cab0c0d65df1371340faeebe6f76 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Mon, 12 Oct 2015 14:23:29 -0700 Subject: [PATCH 05/17] [SPARK-11056] Improve documentation of SBT build. This commit improves the documentation around building Spark to (1) recommend using SBT interactive mode to avoid the overhead of launching SBT and (2) refer to the wiki page that documents using SPARK_PREPEND_CLASSES to avoid creating the assembly jar for each compile. cc srowen Author: Kay Ousterhout Closes #9068 from kayousterhout/SPARK-11056. (cherry picked from commit 091c2c3ecd69803d78c2b15a1487046701059d38) Signed-off-by: Kay Ousterhout --- docs/building-spark.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/building-spark.md b/docs/building-spark.md index 4db32cfd628bc..de9729f0c00a5 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -205,6 +205,11 @@ can be set to control the SBT build. For example: build/sbt -Pyarn -Phadoop-2.3 assembly +To avoid the overhead of launching sbt each time you need to re-compile, you can launch sbt +in interactive mode by running `build/sbt`, and then run all build commands at the command +prompt. For more recommendations on reducing build time, refer to the +[wiki page](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-ReducingBuildTimes). + # Testing with SBT Some of the tests require Spark to be packaged first, so always run `build/sbt assembly` the first time. The following is an example of a correct (build, test) sequence: From 2217f4f8b8fa6ad8b0b7200591a5e7c827cb6e18 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Mon, 12 Oct 2015 14:53:13 -0700 Subject: [PATCH 06/17] Revert "[SPARK-10959] [PYSPARK] StreamingLogisticRegressionWithSGD does not train with given regParam and convergenceTol parameters" This reverts commit f95129c17523ea60220a37576b8a9390943cf98e. --- python/pyspark/mllib/classification.py | 3 +-- python/pyspark/mllib/regression.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index a302ed8ec2435..8f27c446a66e8 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -632,8 +632,7 @@ def update(rdd): if not rdd.isEmpty(): self._model = LogisticRegressionWithSGD.train( rdd, self.numIterations, self.stepSize, - self.miniBatchFraction, self._model.weights, - regParam=self.regParam, convergenceTol=self.convergenceTol) + self.miniBatchFraction, self._model.weights) dstream.foreachRDD(update) diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 562fb36158043..41946e3674fbe 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -669,7 +669,7 @@ def update(rdd): self._model = LinearRegressionWithSGD.train( rdd, self.numIterations, self.stepSize, self.miniBatchFraction, self._model.weights, - intercept=self._model.intercept, convergenceTol=self.convergenceTol) + self._model.intercept) dstream.foreachRDD(update) From 47bc6c0fa3cfbd92bb4470240b0c97040217f370 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 13 Oct 2015 08:29:47 -0500 Subject: [PATCH 07/17] [SPARK-11026] [YARN] spark.yarn.user.classpath.first does work for 'spark-submit --jars hdfs://user/foo.jar' when spark.yarn.user.classpath.first=true and using 'spark-submit --jars hdfs://user/foo.jar', it can not put foo.jar to system classpath. so we need to put yarn's linkNames of jars to the system classpath. vanzin tgravescs Author: Lianhui Wang Closes #9045 from lianhuiwang/spark-11026. (cherry picked from commit 626aab79c9b4d4ac9d65bf5fa45b81dd9cbc609c) Signed-off-by: Tom Graves --- .../org/apache/spark/deploy/yarn/Client.scala | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index f2e1c2b79f516..f21f5efbfb196 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1144,7 +1144,7 @@ object Client extends Logging { } else { getMainJarUri(sparkConf.getOption(CONF_SPARK_USER_JAR)) } - mainJar.foreach(addFileToClasspath(sparkConf, _, APP_JAR, env)) + mainJar.foreach(addFileToClasspath(sparkConf, conf, _, APP_JAR, env)) val secondaryJars = if (args != null) { @@ -1153,10 +1153,10 @@ object Client extends Logging { getSecondaryJarUris(sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS)) } secondaryJars.foreach { x => - addFileToClasspath(sparkConf, x, null, env) + addFileToClasspath(sparkConf, conf, x, null, env) } } - addFileToClasspath(sparkConf, new URI(sparkJar(sparkConf)), SPARK_JAR, env) + addFileToClasspath(sparkConf, conf, new URI(sparkJar(sparkConf)), SPARK_JAR, env) populateHadoopClasspath(conf, env) sys.env.get(ENV_DIST_CLASSPATH).foreach { cp => addClasspathEntry(getClusterPath(sparkConf, cp), env) @@ -1191,15 +1191,17 @@ object Client extends Logging { * If an alternate name for the file is given, and it's not a "local:" file, the alternate * name will be added to the classpath (relative to the job's work directory). * - * If not a "local:" file and no alternate name, the environment is not modified. + * If not a "local:" file and no alternate name, the linkName will be added to the classpath. * - * @param conf Spark configuration. - * @param uri URI to add to classpath (optional). - * @param fileName Alternate name for the file (optional). - * @param env Map holding the environment variables. + * @param conf Spark configuration. + * @param hadoopConf Hadoop configuration. + * @param uri URI to add to classpath (optional). + * @param fileName Alternate name for the file (optional). + * @param env Map holding the environment variables. */ private def addFileToClasspath( conf: SparkConf, + hadoopConf: Configuration, uri: URI, fileName: String, env: HashMap[String, String]): Unit = { @@ -1208,6 +1210,11 @@ object Client extends Logging { } else if (fileName != null) { addClasspathEntry(buildPath( YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env) + } else if (uri != null) { + val localPath = getQualifiedLocalPath(uri, hadoopConf) + val linkName = Option(uri.getFragment()).getOrElse(localPath.getName()) + addClasspathEntry(buildPath( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), linkName), env) } } From edc509586c3aee4b44e0659724d2e3d81d8518de Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 13 Oct 2015 09:40:36 -0700 Subject: [PATCH 08/17] [SPARK-11009] [SQL] fix wrong result of Window function in cluster mode Currently, All windows function could generate wrong result in cluster sometimes. The root cause is that AttributeReference is called in executor, then id of it may not be unique than others created in driver. Here is the script that could reproduce the problem (run in local cluster): ``` from pyspark import SparkContext, HiveContext from pyspark.sql.window import Window from pyspark.sql.functions import rowNumber sqlContext = HiveContext(SparkContext()) sqlContext.setConf("spark.sql.shuffle.partitions", "3") df = sqlContext.range(1<<20) df2 = df.select((df.id % 1000).alias("A"), (df.id / 1000).alias('B')) ws = Window.partitionBy(df2.A).orderBy(df2.B) df3 = df2.select("client", "date", rowNumber().over(ws).alias("rn")).filter("rn < 0") assert df3.count() == 0 ``` Author: Davies Liu Author: Yin Huai Closes #9050 from davies/wrong_window. Conflicts: sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala --- .../apache/spark/sql/execution/Window.scala | 20 ++++----- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 43 ++++++++++++++++++- 2 files changed, 52 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index f8929530c5036..55035f4bc5f2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -145,11 +145,10 @@ case class Window( // Construct the ordering. This is used to compare the result of current value projection // to the result of bound value projection. This is done manually because we want to use // Code Generation (if it is enabled). - val (sortExprs, schema) = exprs.map { case e => - val ref = AttributeReference("ordExpr", e.dataType, e.nullable)() - (SortOrder(ref, e.direction), ref) - }.unzip - val ordering = newOrdering(sortExprs, schema) + val sortExprs = exprs.zipWithIndex.map { case (e, i) => + SortOrder(BoundReference(i, e.dataType, e.nullable), e.direction) + } + val ordering = newOrdering(sortExprs, Nil) RangeBoundOrdering(ordering, current, bound) case RowFrame => RowBoundOrdering(offset) } @@ -205,14 +204,15 @@ case class Window( */ private[this] def createResultProjection( expressions: Seq[Expression]): MutableProjection = { - val unboundToAttr = expressions.map { - e => (e, AttributeReference("windowResult", e.dataType, e.nullable)()) + val references = expressions.zipWithIndex.map{ case (e, i) => + // Results of window expressions will be on the right side of child's output + BoundReference(child.output.size + i, e.dataType, e.nullable) } - val unboundToAttrMap = unboundToAttr.toMap - val patchedWindowExpression = windowExpression.map(_.transform(unboundToAttrMap)) + val unboundToRefMap = expressions.zip(references).toMap + val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap)) newMutableProjection( projectList ++ patchedWindowExpression, - child.output ++ unboundToAttr.map(_._2))() + child.output)() } protected override def doExecute(): RDD[InternalRow] = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 1d5ee22e99e0a..788118b6e3082 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -29,7 +29,8 @@ import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ import org.apache.spark._ -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{SQLContext, QueryTest} +import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext} import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.sql.types.DecimalType @@ -107,6 +108,16 @@ class HiveSparkSubmitSuite runSparkSubmit(args) } + test("SPARK-11009 fix wrong result of Window function in cluster mode") { + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val args = Seq( + "--class", SPARK_11009.getClass.getName.stripSuffix("$"), + "--name", "SparkSQLConfTest", + "--master", "local-cluster[2,1,1024]", + unusedJar.toString) + runSparkSubmit(args) + } + // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. // This is copied from org.apache.spark.deploy.SparkSubmitSuite private def runSparkSubmit(args: Seq[String]): Unit = { @@ -314,3 +325,33 @@ object SPARK_9757 extends QueryTest with Logging { } } } + +object SPARK_11009 extends QueryTest { + import org.apache.spark.sql.functions._ + + protected var sqlContext: SQLContext = _ + + def main(args: Array[String]): Unit = { + Utils.configTestLog4j("INFO") + + val sparkContext = new SparkContext( + new SparkConf() + .set("spark.ui.enabled", "false") + .set("spark.sql.shuffle.partitions", "100")) + + val hiveContext = new TestHiveContext(sparkContext) + sqlContext = hiveContext + + try { + val df = sqlContext.range(1 << 20) + val df2 = df.select((df("id") % 1000).alias("A"), (df("id") / 1000).alias("B")) + val ws = Window.partitionBy(df2("A")).orderBy(df2("B")) + val df3 = df2.select(df2("A"), df2("B"), rowNumber().over(ws).alias("rn")).filter("rn < 0") + if (df3.rdd.count() != 0) { + throw new Exception("df3 should have 0 output row.") + } + } finally { + sparkContext.stop() + } + } +} From 77eeaad988bfec6c9c3cd7a403f5cfd0fb9bff5d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 13 Oct 2015 15:18:20 -0700 Subject: [PATCH 09/17] [SPARK-10932] [PROJECT INFRA] Port two minor changes to release-build.sh from scripts' old repo Spark's release packaging scripts used to live in a separate repository. Although these scripts are now part of the Spark repo, there are some minor patches made against the old repos that are missing in Spark's copy of the script. This PR ports those changes. /cc shivaram, who originally submitted these changes against https://github.com/rxin/spark-utils Author: Josh Rosen Closes #8986 from JoshRosen/port-release-build-fixes-from-rxin-repo. --- dev/create-release/release-build.sh | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index 9dac43ce54425..cb79e9eba06e2 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -70,7 +70,7 @@ GIT_REF=${GIT_REF:-master} # Destination directory parent on remote server REMOTE_PARENT_DIR=${REMOTE_PARENT_DIR:-/home/$ASF_USERNAME/public_html} -SSH="ssh -o StrictHostKeyChecking=no -i $ASF_RSA_KEY" +SSH="ssh -o ConnectTimeout=300 -o StrictHostKeyChecking=no -i $ASF_RSA_KEY" GPG="gpg --no-tty --batch" NEXUS_ROOT=https://repository.apache.org/service/local/staging NEXUS_PROFILE=d63f592e7eac0 # Profile for Spark staging uploads @@ -141,8 +141,12 @@ if [[ "$1" == "package" ]]; then export ZINC_PORT=$ZINC_PORT echo "Creating distribution: $NAME ($FLAGS)" - ./make-distribution.sh --name $NAME --tgz $FLAGS -DzincPort=$ZINC_PORT 2>&1 > \ - ../binary-release-$NAME.log + + # Get maven home set by MVN + MVN_HOME=`$MVN -version 2>&1 | grep 'Maven home' | awk '{print $NF}'` + + ./make-distribution.sh --name $NAME --mvn $MVN_HOME/bin/mvn --tgz $FLAGS \ + -DzincPort=$ZINC_PORT 2>&1 > ../binary-release-$NAME.log cd .. cp spark-$SPARK_VERSION-bin-$NAME/spark-$SPARK_VERSION-bin-$NAME.tgz . From 15d2736af7b521a666ffb4e83cd253db08c4ac96 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 13 Oct 2015 15:59:36 -0700 Subject: [PATCH 10/17] =?UTF-8?q?[SPARK-10959]=20[PYSPARK]=20StreamingLogi?= =?UTF-8?q?sticRegressionWithSGD=20does=20not=20t=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …rain with given regParam and StreamingLinearRegressionWithSGD intercept param is not in correct position. regParam was being passed into the StreamingLogisticRegressionWithSGD constructor, but not transferred to the call for model training. The param is added as a named argument to the call. For StreamingLinearRegressionWithSGC the intercept parameter was not in the correct position and was being passed in as the regularization value. Author: Bryan Cutler Closes #9087 from BryanCutler/StreamingSGD-convergenceTol-bug-10959-branch-1.5. --- python/pyspark/mllib/classification.py | 3 ++- python/pyspark/mllib/regression.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index 8f27c446a66e8..e4500a0bc8315 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -632,7 +632,8 @@ def update(rdd): if not rdd.isEmpty(): self._model = LogisticRegressionWithSGD.train( rdd, self.numIterations, self.stepSize, - self.miniBatchFraction, self._model.weights) + self.miniBatchFraction, self._model.weights, + regParam=self.regParam) dstream.foreachRDD(update) diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 41946e3674fbe..6bbac026507c2 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -669,7 +669,7 @@ def update(rdd): self._model = LinearRegressionWithSGD.train( rdd, self.numIterations, self.stepSize, self.miniBatchFraction, self._model.weights, - self._model.intercept) + intercept=self._model.intercept) dstream.foreachRDD(update) From 94e6d8f72bd2752f571d134146022db264b19c3b Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 13 Oct 2015 16:16:08 -0700 Subject: [PATCH 11/17] [SPARK-10389] [SQL] [1.5] support order by non-attribute grouping expression on Aggregate backport https://github.com/apache/spark/pull/8548 to 1.5 Author: Wenchen Fan Closes #9102 from cloud-fan/branch-1.5. --- .../sql/catalyst/analysis/Analyzer.scala | 72 ++++++++++--------- .../org/apache/spark/sql/SQLQuerySuite.scala | 10 +++ 2 files changed, 47 insertions(+), 35 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 6e7353fdbdfdf..47db44880f531 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -576,43 +576,47 @@ class Analyzer( filter } - case sort @ Sort(sortOrder, global, - aggregate @ Aggregate(grouping, originalAggExprs, child)) + case sort @ Sort(sortOrder, global, aggregate: Aggregate) if aggregate.resolved && !sort.resolved => // Try resolving the ordering as though it is in the aggregate clause. try { - val aliasedOrder = sortOrder.map(o => Alias(o.child, "aggOrder")()) - val aggregatedOrdering = Aggregate(grouping, aliasedOrder, child) - val resolvedOperator: Aggregate = execute(aggregatedOrdering).asInstanceOf[Aggregate] - def resolvedAggregateOrdering = resolvedOperator.aggregateExpressions - - // Expressions that have an aggregate can be pushed down. - val needsAggregate = resolvedAggregateOrdering.exists(containsAggregate) - - // Attribute references, that are missing from the order but are present in the grouping - // expressions can also be pushed down. - val requiredAttributes = resolvedAggregateOrdering.map(_.references).reduce(_ ++ _) - val missingAttributes = requiredAttributes -- aggregate.outputSet - val validPushdownAttributes = - missingAttributes.filter(a => grouping.exists(a.semanticEquals)) - - // If resolution was successful and we see the ordering either has an aggregate in it or - // it is missing something that is projected away by the aggregate, add the ordering - // the original aggregate operator. - if (resolvedOperator.resolved && (needsAggregate || validPushdownAttributes.nonEmpty)) { - val evaluatedOrderings: Seq[SortOrder] = sortOrder.zip(resolvedAggregateOrdering).map { - case (order, evaluated) => order.copy(child = evaluated.toAttribute) - } - val aggExprsWithOrdering: Seq[NamedExpression] = - resolvedAggregateOrdering ++ originalAggExprs - - Project(aggregate.output, - Sort(evaluatedOrderings, global, - aggregate.copy(aggregateExpressions = aggExprsWithOrdering))) - } else { - sort + val aliasedOrdering = sortOrder.map(o => Alias(o.child, "aggOrder")()) + val aggregatedOrdering = aggregate.copy(aggregateExpressions = aliasedOrdering) + val resolvedAggregate: Aggregate = execute(aggregatedOrdering).asInstanceOf[Aggregate] + val resolvedAliasedOrdering: Seq[Alias] = + resolvedAggregate.aggregateExpressions.asInstanceOf[Seq[Alias]] + + // If we pass the analysis check, then the ordering expressions should only reference to + // aggregate expressions or grouping expressions, and it's safe to push them down to + // Aggregate. + checkAnalysis(resolvedAggregate) + + val originalAggExprs = aggregate.aggregateExpressions.map( + CleanupAliases.trimNonTopLevelAliases(_).asInstanceOf[NamedExpression]) + + // If the ordering expression is same with original aggregate expression, we don't need + // to push down this ordering expression and can reference the original aggregate + // expression instead. + val needsPushDown = ArrayBuffer.empty[NamedExpression] + val evaluatedOrderings = resolvedAliasedOrdering.zip(sortOrder).map { + case (evaluated, order) => + val index = originalAggExprs.indexWhere { + case Alias(child, _) => child semanticEquals evaluated.child + case other => other semanticEquals evaluated.child + } + + if (index == -1) { + needsPushDown += evaluated + order.copy(child = evaluated.toAttribute) + } else { + order.copy(child = originalAggExprs(index).toAttribute) + } } + + Project(aggregate.output, + Sort(evaluatedOrderings, global, + aggregate.copy(aggregateExpressions = originalAggExprs ++ needsPushDown))) } catch { // Attempting to resolve in the aggregate can result in ambiguity. When this happens, // just return the original plan. @@ -621,9 +625,7 @@ class Analyzer( } protected def containsAggregate(condition: Expression): Boolean = { - condition - .collect { case ae: AggregateExpression => ae } - .nonEmpty + condition.find(_.isInstanceOf[AggregateExpression]).isDefined } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 4f31bd0483932..598a6eac95a97 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1745,4 +1745,14 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { df1.withColumn("diff", lit(0))) } } + + test("SPARK-10389: order by non-attribute grouping expression on Aggregate") { + withTempTable("src") { + Seq((1, 1), (-1, 1)).toDF("key", "value").registerTempTable("src") + checkAnswer(sql("SELECT MAX(value) FROM src GROUP BY key + 1 ORDER BY key + 1"), + Seq(Row(1), Row(1))) + checkAnswer(sql("SELECT MAX(value) FROM src GROUP BY key + 1 ORDER BY (key + 1) * 2"), + Seq(Row(1), Row(1))) + } + } } From 3e9d56e8b9fed2d2e4777416b146804a8ae607ab Mon Sep 17 00:00:00 2001 From: Monica Liu Date: Tue, 13 Oct 2015 22:24:52 -0700 Subject: [PATCH 12/17] [SPARK-10981] [SPARKR] SparkR Join improvements I was having issues with collect() and orderBy() in Spark 1.5.0 so I used the DataFrame.R file and test_sparkSQL.R file from the Spark 1.5.1 download. I only modified the join() function in DataFrame.R to include "full", "fullouter", "left", "right", and "leftsemi" and added corresponding test cases in the test for join() and merge() in test_sparkSQL.R file. Pull request because I filed this JIRA bug report: https://issues.apache.org/jira/browse/SPARK-10981 Author: Monica Liu Closes #9029 from mfliu/master. (cherry picked from commit 8b32885704502ab2a715cf5142d7517181074428) Signed-off-by: Shivaram Venkataraman --- R/pkg/R/DataFrame.R | 13 +++++++++---- R/pkg/inst/tests/test_sparkSQL.R | 27 +++++++++++++++++++++++++-- 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index ed304c2a481bf..ffb6dbd702cc2 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1352,9 +1352,10 @@ setMethod("where", #' @param x A Spark DataFrame #' @param y A Spark DataFrame #' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a -#' Column expression. If joinExpr is omitted, join() wil perform a Cartesian join +#' Column expression. If joinExpr is omitted, join() will perform a Cartesian join #' @param joinType The type of join to perform. The following join types are available: -#' 'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'. The default joinType is "inner". +#' 'inner', 'outer', 'full', 'fullouter', leftouter', 'left_outer', 'left', +#' 'right_outer', 'rightouter', 'right', and 'leftsemi'. The default joinType is "inner". #' @return A DataFrame containing the result of the join operation. #' @rdname join #' @name join @@ -1379,11 +1380,15 @@ setMethod("join", if (is.null(joinType)) { sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc) } else { - if (joinType %in% c("inner", "outer", "left_outer", "right_outer", "semijoin")) { + if (joinType %in% c("inner", "outer", "full", "fullouter", + "leftouter", "left_outer", "left", + "rightouter", "right_outer", "right", "leftsemi")) { + joinType <- gsub("_", "", joinType) sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc, joinType) } else { stop("joinType must be one of the following types: ", - "'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'") + "'inner', 'outer', 'full', 'fullouter', 'leftouter', 'left_outer', 'left', + 'rightouter', 'right_outer', 'right', 'leftsemi'") } } } diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index aedc385c4ff4d..63513d9f380c1 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -952,7 +952,7 @@ test_that("join() and merge() on a DataFrame", { expect_equal(names(joined2), c("age", "name", "name", "test")) expect_equal(count(joined2), 3) - joined3 <- join(df, df2, df$name == df2$name, "right_outer") + joined3 <- join(df, df2, df$name == df2$name, "rightouter") expect_equal(names(joined3), c("age", "name", "name", "test")) expect_equal(count(joined3), 4) expect_true(is.na(collect(orderBy(joined3, joined3$age))$age[2])) @@ -963,11 +963,34 @@ test_that("join() and merge() on a DataFrame", { expect_equal(count(joined4), 4) expect_equal(collect(orderBy(joined4, joined4$name))$newAge[3], 24) + joined5 <- join(df, df2, df$name == df2$name, "leftouter") + expect_equal(names(joined5), c("age", "name", "name", "test")) + expect_equal(count(joined5), 3) + expect_true(is.na(collect(orderBy(joined5, joined5$age))$age[1])) + + joined6 <- join(df, df2, df$name == df2$name, "inner") + expect_equal(names(joined6), c("age", "name", "name", "test")) + expect_equal(count(joined6), 3) + + joined7 <- join(df, df2, df$name == df2$name, "leftsemi") + expect_equal(names(joined7), c("age", "name")) + expect_equal(count(joined7), 3) + + joined8 <- join(df, df2, df$name == df2$name, "left_outer") + expect_equal(names(joined8), c("age", "name", "name", "test")) + expect_equal(count(joined8), 3) + expect_true(is.na(collect(orderBy(joined8, joined8$age))$age[1])) + + joined9 <- join(df, df2, df$name == df2$name, "right_outer") + expect_equal(names(joined9), c("age", "name", "name", "test")) + expect_equal(count(joined9), 4) + expect_true(is.na(collect(orderBy(joined9, joined9$age))$age[2])) + merged <- select(merge(df, df2, df$name == df2$name, "outer"), alias(df$age + 5, "newAge"), df$name, df2$test) expect_equal(names(merged), c("newAge", "name", "test")) expect_equal(count(merged), 4) - expect_equal(collect(orderBy(merged, joined4$name))$newAge[3], 24) + expect_equal(collect(orderBy(merged, merged$name))$newAge[3], 24) }) test_that("toJSON() returns an RDD of the correct values", { From ad5bf0efb54c8f625158616f86ee6708966b525c Mon Sep 17 00:00:00 2001 From: Tom Graves Date: Wed, 14 Oct 2015 10:12:25 -0700 Subject: [PATCH 13/17] [SPARK-10619] Can't sort columns on Executor Page should pick into spark 1.5.2 also. https://issues.apache.org/jira/browse/SPARK-10619 looks like this was broken by commit: https://github.com/apache/spark/commit/fb1d06fc242ec00320f1a3049673fbb03c4a6eb9#diff-b8adb646ef90f616c34eb5c98d1ebd16 It looks like somethings were change to use the UIUtils.listingTable but executor page wasn't converted so when it removed sortable from the UIUtils. TABLE_CLASS_NOT_STRIPED it broke this page. Simply add the sortable tag back in and it fixes both active UI and the history server UI. Author: Tom Graves Closes #9101 from tgravescs/SPARK-10619. (cherry picked from commit 135a2ce5b0b927b512c832d61c25e7b9d57e30be) Signed-off-by: Reynold Xin --- core/src/main/scala/org/apache/spark/ui/UIUtils.scala | 1 + .../src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala | 2 +- .../src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala | 2 +- .../main/scala/org/apache/spark/streaming/ui/BatchPage.scala | 2 +- 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 21dc8f0b65485..68a9f912a5d2c 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -31,6 +31,7 @@ import org.apache.spark.ui.scope.RDDOperationGraph private[spark] object UIUtils extends Logging { val TABLE_CLASS_NOT_STRIPED = "table table-bordered table-condensed" val TABLE_CLASS_STRIPED = TABLE_CLASS_NOT_STRIPED + " table-striped" + val TABLE_CLASS_STRIPED_SORTABLE = TABLE_CLASS_STRIPED + " sortable" // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use. private val dateFormat = new ThreadLocal[SimpleDateFormat]() { diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 01cddda4c62cd..1a29b0f412603 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -62,7 +62,7 @@ private[ui] class ExecutorsPage( val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty val execTable = - +
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index d5cdbfac104f8..be144f6065baa 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -50,7 +50,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage hasBytesSpilled = data.hasBytesSpilled }) -
Executor ID Address
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala index 1b717b64542d5..a19b85a51d289 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala @@ -443,7 +443,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { } def generateInputMetadataTable(inputMetadatas: Seq[(Int, String)]): Seq[Node] = { -
Executor ID Address
+
From f36624983caf5c3196e4efa8b490337ee43b9e28 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 26 Sep 2015 19:08:55 -0700 Subject: [PATCH 14/17] [SPARK-10845] [SQL] Makes spark.sql.hive.version a SQLConfEntry When refactoring SQL options from plain strings to the strongly typed `SQLConfEntry`, `spark.sql.hive.version` wasn't migrated, and doesn't show up in the result of `SET -v`, as `SET -v` only shows public `SQLConfEntry` instances. This affects compatibility with Simba ODBC driver. This PR migrates this SQL option as a `SQLConfEntry` to fix this issue. Author: Cheng Lian Closes #8925 from liancheng/spark-10845/hive-version-conf. (cherry picked from commit 6f94d56a95e8c3a410a8d0c6a24ccca043227ba9) Signed-off-by: Reynold Xin --- .../hive/thriftserver/HiveThriftServer2.scala | 1 - .../HiveThriftServer2Suites.scala | 27 +++++++++++++++++++ .../apache/spark/sql/hive/HiveContext.scala | 5 ++++ 3 files changed, 32 insertions(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index a0643cec0fb7c..a4fd0c3ce9702 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -55,7 +55,6 @@ object HiveThriftServer2 extends Logging { @DeveloperApi def startWithContext(sqlContext: HiveContext): Unit = { val server = new HiveThriftServer2(sqlContext) - sqlContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion) server.init(sqlContext.hiveconf) server.start() listener = new HiveThriftServer2Listener(server, sqlContext.conf) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index b72249b3bf8c0..19b2f24456ab0 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -21,6 +21,7 @@ import java.io.File import java.net.URL import java.sql.{Date, DriverManager, SQLException, Statement} +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ @@ -431,6 +432,32 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } ) } + + test("Checks Hive version via SET -v") { + withJdbcStatement { statement => + val resultSet = statement.executeQuery("SET -v") + + val conf = mutable.Map.empty[String, String] + while (resultSet.next()) { + conf += resultSet.getString(1) -> resultSet.getString(2) + } + + assert(conf.get("spark.sql.hive.version") === Some("1.2.1")) + } + } + + test("Checks Hive version via SET") { + withJdbcStatement { statement => + val resultSet = statement.executeQuery("SET") + + val conf = mutable.Map.empty[String, String] + while (resultSet.next()) { + conf += resultSet.getString(1) -> resultSet.getString(2) + } + + assert(conf.get("spark.sql.hive.version") === Some("1.2.1")) + } + } } class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 0ea9252d71e3a..5dc3d81bd808f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -610,6 +610,11 @@ private[hive] object HiveContext { doc = "Version of the Hive metastore. Available options are " + s"0.12.0 through $hiveExecutionVersion.") + val HIVE_EXECUTION_VERSION = stringConf( + key = "spark.sql.hive.version", + defaultValue = Some(hiveExecutionVersion), + doc = "Version of Hive used internally by Spark SQL.") + val HIVE_METASTORE_JARS = stringConf("spark.sql.hive.metastore.jars", defaultValue = Some("builtin"), doc = s""" From 30eea40fff97391b8ee3201dd7c6ea7440521386 Mon Sep 17 00:00:00 2001 From: Jian Feng Date: Mon, 21 Sep 2015 23:36:41 -0700 Subject: [PATCH 15/17] [SPARK-10577] [PYSPARK] DataFrame hint for broadcast join https://issues.apache.org/jira/browse/SPARK-10577 Author: Jian Feng Closes #8801 from Jianfeng-chs/master. (cherry picked from commit 0180b849dbaf191826231eda7dfaaf146a19602b) Signed-off-by: Reynold Xin Conflicts: python/pyspark/sql/tests.py --- python/pyspark/sql/functions.py | 9 +++++++++ python/pyspark/sql/tests.py | 27 +++++++++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 4b74a501521a5..3c631a0328dee 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -30,6 +30,7 @@ from pyspark.sql import since from pyspark.sql.types import StringType from pyspark.sql.column import Column, _to_java_column, _to_seq +from pyspark.sql.dataframe import DataFrame def _create_function(name, doc=""): @@ -190,6 +191,14 @@ def approxCountDistinct(col, rsd=None): return Column(jc) +@since(1.6) +def broadcast(df): + """Marks a DataFrame as small enough for use in broadcast joins.""" + + sc = SparkContext._active_spark_context + return DataFrame(sc._jvm.functions.broadcast(df._jdf), df.sql_ctx) + + @since(1.4) def coalesce(*cols): """Returns the first column that is not null. diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 6b647f3aacfa1..14414b3d566f8 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1056,6 +1056,33 @@ def test_with_column_with_existing_name(self): keys = self.df.withColumn("key", self.df.key).select("key").collect() self.assertEqual([r.key for r in keys], list(range(100))) + # regression test for SPARK-10417 + def test_column_iterator(self): + + def foo(): + for x in self.df.key: + break + + self.assertRaises(TypeError, foo) + + # add test for SPARK-10577 (test broadcast join hint) + def test_functions_broadcast(self): + from pyspark.sql.functions import broadcast + + df1 = self.sqlCtx.createDataFrame([(1, "1"), (2, "2")], ("key", "value")) + df2 = self.sqlCtx.createDataFrame([(1, "1"), (2, "2")], ("key", "value")) + + # equijoin - should be converted into broadcast join + plan1 = df1.join(broadcast(df2), "key")._jdf.queryExecution().executedPlan() + self.assertEqual(1, plan1.toString().count("BroadcastHashJoin")) + + # no join key -- should not be a broadcast join + plan2 = df1.join(broadcast(df2))._jdf.queryExecution().executedPlan() + self.assertEqual(0, plan2.toString().count("BroadcastHashJoin")) + + # planner should not crash without a join + broadcast(df1)._jdf.queryExecution().executedPlan() + class HiveContextSQLTests(ReusedPySparkTestCase): From c27e19042e66446f3a4fb4673468b2b83633ea89 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 14 Oct 2015 12:31:29 -0700 Subject: [PATCH 16/17] [SPARK-8386] [SQL] add write.mode for insertIntoJDBC when the parm overwrite is false the fix is for jira https://issues.apache.org/jira/browse/SPARK-8386 Author: Huaxin Gao Closes #9042 from huaxingao/spark8386. (cherry picked from commit 7e1308d37f6ca35f063e67e4b87a77e932ad89a5) Signed-off-by: Reynold Xin --- sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index f024db7ae2cdd..3fce47fa655d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1625,7 +1625,7 @@ class DataFrame private[sql]( */ @deprecated("Use write.jdbc()", "1.4.0") def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit = { - val w = if (overwrite) write.mode(SaveMode.Overwrite) else write + val w = if (overwrite) write.mode(SaveMode.Overwrite) else write.mode(SaveMode.Append) w.jdbc(url, table, new Properties) } From ab6daa5bec1958561b0448d3175449642893837d Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Thu, 15 Oct 2015 10:03:01 -0700 Subject: [PATCH 17/17] fixed mismerge --- .../scala/org/apache/spark/sql/execution/Window.scala | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index 525df932338ee..55035f4bc5f2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -200,7 +200,6 @@ case class Window( * This method uses Code Generation. It can only be used on the executor side. * * @param expressions unbound ordered function expressions. - * @param attributes output attributes * @return the final resulting projection. */ private[this] def createResultProjection( @@ -248,17 +247,12 @@ case class Window( factories(index) = () => createFrameProcessor(frame, functions, ordinal) } - // AttributeReference can only be created in driver, or the id will not be unique - val outputAttributes = unboundExpressions.map { - e => AttributeReference("windowResult", e.dataType, e.nullable)() - } - // Start processing. child.execute().mapPartitions { stream => new Iterator[InternalRow] { // Get all relevant projections. - val result = createResultProjection(unboundExpressions, outputAttributes) + val result = createResultProjection(unboundExpressions) val grouping = if (child.outputsUnsafeRows) { UnsafeProjection.create(partitionSpec, child.output) } else {
Input