diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index ed304c2a481bf..ffb6dbd702cc2 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1352,9 +1352,10 @@ setMethod("where",
 #' @param x A Spark DataFrame
 #' @param y A Spark DataFrame
 #' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a
-#' Column expression. If joinExpr is omitted, join() wil perform a Cartesian join
+#' Column expression. If joinExpr is omitted, join() will perform a Cartesian join
 #' @param joinType The type of join to perform. The following join types are available:
-#' 'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'. The default joinType is "inner".
+#' 'inner', 'outer', 'full', 'fullouter', leftouter', 'left_outer', 'left',
+#' 'right_outer', 'rightouter', 'right', and 'leftsemi'. The default joinType is "inner".
 #' @return A DataFrame containing the result of the join operation.
 #' @rdname join
 #' @name join
@@ -1379,11 +1380,15 @@ setMethod("join",
               if (is.null(joinType)) {
                 sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc)
               } else {
-                if (joinType %in% c("inner", "outer", "left_outer", "right_outer", "semijoin")) {
+                if (joinType %in% c("inner", "outer", "full", "fullouter",
+                    "leftouter", "left_outer", "left",
+                    "rightouter", "right_outer", "right", "leftsemi")) {
+                  joinType <- gsub("_", "", joinType)
                   sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc, joinType)
                 } else {
                   stop("joinType must be one of the following types: ",
-                       "'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'")
+                      "'inner', 'outer', 'full', 'fullouter', 'leftouter', 'left_outer', 'left',
+                      'rightouter', 'right_outer', 'right', 'leftsemi'")
                 }
               }
             }
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index aedc385c4ff4d..63513d9f380c1 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -952,7 +952,7 @@ test_that("join() and merge() on a DataFrame", {
   expect_equal(names(joined2), c("age", "name", "name", "test"))
   expect_equal(count(joined2), 3)
 
-  joined3 <- join(df, df2, df$name == df2$name, "right_outer")
+  joined3 <- join(df, df2, df$name == df2$name, "rightouter")
   expect_equal(names(joined3), c("age", "name", "name", "test"))
   expect_equal(count(joined3), 4)
   expect_true(is.na(collect(orderBy(joined3, joined3$age))$age[2]))
@@ -963,11 +963,34 @@ test_that("join() and merge() on a DataFrame", {
   expect_equal(count(joined4), 4)
   expect_equal(collect(orderBy(joined4, joined4$name))$newAge[3], 24)
 
+  joined5 <- join(df, df2, df$name == df2$name, "leftouter")
+  expect_equal(names(joined5), c("age", "name", "name", "test"))
+  expect_equal(count(joined5), 3)
+  expect_true(is.na(collect(orderBy(joined5, joined5$age))$age[1]))
+
+  joined6 <- join(df, df2, df$name == df2$name, "inner")
+  expect_equal(names(joined6), c("age", "name", "name", "test"))
+  expect_equal(count(joined6), 3)
+
+  joined7 <- join(df, df2, df$name == df2$name, "leftsemi")
+  expect_equal(names(joined7), c("age", "name"))
+  expect_equal(count(joined7), 3)
+
+  joined8 <- join(df, df2, df$name == df2$name, "left_outer")
+  expect_equal(names(joined8), c("age", "name", "name", "test"))
+  expect_equal(count(joined8), 3)
+  expect_true(is.na(collect(orderBy(joined8, joined8$age))$age[1]))
+
+  joined9 <- join(df, df2, df$name == df2$name, "right_outer")
+  expect_equal(names(joined9), c("age", "name", "name", "test"))
+  expect_equal(count(joined9), 4)
+  expect_true(is.na(collect(orderBy(joined9, joined9$age))$age[2]))
+
   merged <- select(merge(df, df2, df$name == df2$name, "outer"),
                    alias(df$age + 5, "newAge"), df$name, df2$test)
   expect_equal(names(merged), c("newAge", "name", "test"))
   expect_equal(count(merged), 4)
-  expect_equal(collect(orderBy(merged, joined4$name))$newAge[3], 24)
+  expect_equal(collect(orderBy(merged, merged$name))$newAge[3], 24)
 })
 
 test_that("toJSON() returns an RDD of the correct values", {
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 21dc8f0b65485..68a9f912a5d2c 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -31,6 +31,7 @@ import org.apache.spark.ui.scope.RDDOperationGraph
 private[spark] object UIUtils extends Logging {
   val TABLE_CLASS_NOT_STRIPED = "table table-bordered table-condensed"
   val TABLE_CLASS_STRIPED = TABLE_CLASS_NOT_STRIPED + " table-striped"
+  val TABLE_CLASS_STRIPED_SORTABLE = TABLE_CLASS_STRIPED + " sortable"
 
   // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
   private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 01cddda4c62cd..1a29b0f412603 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -62,7 +62,7 @@ private[ui] class ExecutorsPage(
     val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty
 
     val execTable =
-      
+      
         
           | Executor ID | 
           Address | 
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index d5cdbfac104f8..be144f6065baa 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -50,7 +50,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
         hasBytesSpilled = data.hasBytesSpilled
     })
 
-    
+    
       
         | Executor ID | 
         Address | 
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 8313312226713..2dd2c036c9307 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1751,6 +1751,13 @@ private[spark] object Utils extends Logging {
       if (uri.getScheme() != null) {
         return uri
       }
+      // make sure to handle if the path has a fragment (applies to yarn
+      // distributed cache)
+      if (uri.getFragment() != null) {
+        val absoluteURI = new File(uri.getPath()).getAbsoluteFile().toURI()
+        return new URI(absoluteURI.getScheme(), absoluteURI.getHost(), absoluteURI.getPath(),
+          uri.getFragment())
+      }
     } catch {
       case e: URISyntaxException =>
     }
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 1fb81ad565b41..68b0da76bc134 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -384,7 +384,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
     assertResolves("hdfs:/root/spark.jar", "hdfs:/root/spark.jar")
     assertResolves("hdfs:///root/spark.jar#app.jar", "hdfs:/root/spark.jar#app.jar")
     assertResolves("spark.jar", s"file:$cwd/spark.jar")
-    assertResolves("spark.jar#app.jar", s"file:$cwd/spark.jar%23app.jar")
+    assertResolves("spark.jar#app.jar", s"file:$cwd/spark.jar#app.jar")
     assertResolves("path to/file.txt", s"file:$cwd/path%20to/file.txt")
     if (Utils.isWindows) {
       assertResolves("C:\\path\\to\\file.txt", "file:/C:/path/to/file.txt")
@@ -414,10 +414,10 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
     assertResolves("file:/jar1,file:/jar2", "file:/jar1,file:/jar2")
     assertResolves("hdfs:/jar1,file:/jar2,jar3", s"hdfs:/jar1,file:/jar2,file:$cwd/jar3")
     assertResolves("hdfs:/jar1,file:/jar2,jar3,jar4#jar5,path to/jar6",
-      s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4%23jar5,file:$cwd/path%20to/jar6")
+      s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5,file:$cwd/path%20to/jar6")
     if (Utils.isWindows) {
       assertResolves("""hdfs:/jar1,file:/jar2,jar3,C:\pi.py#py.pi,C:\path to\jar4""",
-        s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py%23py.pi,file:/C:/path%20to/jar4")
+        s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi,file:/C:/path%20to/jar4")
     }
   }
 
diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh
index 9dac43ce54425..cb79e9eba06e2 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -70,7 +70,7 @@ GIT_REF=${GIT_REF:-master}
 # Destination directory parent on remote server
 REMOTE_PARENT_DIR=${REMOTE_PARENT_DIR:-/home/$ASF_USERNAME/public_html}
 
-SSH="ssh -o StrictHostKeyChecking=no -i $ASF_RSA_KEY"
+SSH="ssh -o ConnectTimeout=300 -o StrictHostKeyChecking=no -i $ASF_RSA_KEY"
 GPG="gpg --no-tty --batch"
 NEXUS_ROOT=https://repository.apache.org/service/local/staging
 NEXUS_PROFILE=d63f592e7eac0 # Profile for Spark staging uploads
@@ -141,8 +141,12 @@ if [[ "$1" == "package" ]]; then
 
     export ZINC_PORT=$ZINC_PORT
     echo "Creating distribution: $NAME ($FLAGS)"
-    ./make-distribution.sh --name $NAME --tgz $FLAGS -DzincPort=$ZINC_PORT 2>&1 > \
-      ../binary-release-$NAME.log
+
+    # Get maven home set by MVN
+    MVN_HOME=`$MVN -version 2>&1 | grep 'Maven home' | awk '{print $NF}'`
+
+    ./make-distribution.sh --name $NAME --mvn $MVN_HOME/bin/mvn --tgz $FLAGS \
+      -DzincPort=$ZINC_PORT 2>&1 >  ../binary-release-$NAME.log
     cd ..
     cp spark-$SPARK_VERSION-bin-$NAME/spark-$SPARK_VERSION-bin-$NAME.tgz .
 
diff --git a/docs/building-spark.md b/docs/building-spark.md
index 4db32cfd628bc..de9729f0c00a5 100644
--- a/docs/building-spark.md
+++ b/docs/building-spark.md
@@ -205,6 +205,11 @@ can be set to control the SBT build. For example:
 
     build/sbt -Pyarn -Phadoop-2.3 assembly
 
+To avoid the overhead of launching sbt each time you need to re-compile, you can launch sbt
+in interactive mode by running `build/sbt`, and then run all build commands at the command
+prompt. For more recommendations on reducing build time, refer to the
+[wiki page](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-ReducingBuildTimes).
+
 # Testing with SBT
 
 Some of the tests require Spark to be packaged first, so always run `build/sbt assembly` the first time.  The following is an example of a correct (build, test) sequence:
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index a302ed8ec2435..e4500a0bc8315 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -633,7 +633,7 @@ def update(rdd):
                 self._model = LogisticRegressionWithSGD.train(
                     rdd, self.numIterations, self.stepSize,
                     self.miniBatchFraction, self._model.weights,
-                    regParam=self.regParam, convergenceTol=self.convergenceTol)
+                    regParam=self.regParam)
 
         dstream.foreachRDD(update)
 
diff --git a/python/pyspark/mllib/linalg/__init__.py b/python/pyspark/mllib/linalg/__init__.py
index 30d05d0ec3609..2db2dec275bd9 100644
--- a/python/pyspark/mllib/linalg/__init__.py
+++ b/python/pyspark/mllib/linalg/__init__.py
@@ -732,6 +732,9 @@ def __getitem__(self, index):
             raise ValueError("Index %d out of bounds." % index)
 
         insert_index = np.searchsorted(inds, index)
+        if insert_index >= inds.size:
+            return 0.
+
         row_ind = inds[insert_index]
         if row_ind == index:
             return vals[insert_index]
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index 562fb36158043..6bbac026507c2 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -669,7 +669,7 @@ def update(rdd):
                 self._model = LinearRegressionWithSGD.train(
                     rdd, self.numIterations, self.stepSize,
                     self.miniBatchFraction, self._model.weights,
-                    intercept=self._model.intercept, convergenceTol=self.convergenceTol)
+                    intercept=self._model.intercept)
 
         dstream.foreachRDD(update)
 
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 5097c5e8ba4cd..4d471443d192c 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -205,15 +205,17 @@ def test_conversion(self):
         self.assertTrue(dv.array.dtype == 'float64')
 
     def test_sparse_vector_indexing(self):
-        sv = SparseVector(4, {1: 1, 3: 2})
-        self.assertEquals(sv[0], 0.)
-        self.assertEquals(sv[3], 2.)
-        self.assertEquals(sv[1], 1.)
-        self.assertEquals(sv[2], 0.)
-        self.assertEquals(sv[-1], 2)
-        self.assertEquals(sv[-2], 0)
-        self.assertEquals(sv[-4], 0)
-        for ind in [4, -5]:
+        sv = SparseVector(5, {1: 1, 3: 2})
+        self.assertEqual(sv[0], 0.)
+        self.assertEqual(sv[3], 2.)
+        self.assertEqual(sv[1], 1.)
+        self.assertEqual(sv[2], 0.)
+        self.assertEqual(sv[4], 0.)
+        self.assertEqual(sv[-1], 0.)
+        self.assertEqual(sv[-2], 2.)
+        self.assertEqual(sv[-3], 0.)
+        self.assertEqual(sv[-5], 0.)
+        for ind in [5, -6]:
             self.assertRaises(ValueError, sv.__getitem__, ind)
         for ind in [7.8, '1']:
             self.assertRaises(TypeError, sv.__getitem__, ind)
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 4b74a501521a5..3c631a0328dee 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -30,6 +30,7 @@
 from pyspark.sql import since
 from pyspark.sql.types import StringType
 from pyspark.sql.column import Column, _to_java_column, _to_seq
+from pyspark.sql.dataframe import DataFrame
 
 
 def _create_function(name, doc=""):
@@ -190,6 +191,14 @@ def approxCountDistinct(col, rsd=None):
     return Column(jc)
 
 
+@since(1.6)
+def broadcast(df):
+    """Marks a DataFrame as small enough for use in broadcast joins."""
+
+    sc = SparkContext._active_spark_context
+    return DataFrame(sc._jvm.functions.broadcast(df._jdf), df.sql_ctx)
+
+
 @since(1.4)
 def coalesce(*cols):
     """Returns the first column that is not null.
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 6b647f3aacfa1..14414b3d566f8 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1056,6 +1056,33 @@ def test_with_column_with_existing_name(self):
         keys = self.df.withColumn("key", self.df.key).select("key").collect()
         self.assertEqual([r.key for r in keys], list(range(100)))
 
+    # regression test for SPARK-10417
+    def test_column_iterator(self):
+
+        def foo():
+            for x in self.df.key:
+                break
+
+        self.assertRaises(TypeError, foo)
+
+    # add test for SPARK-10577 (test broadcast join hint)
+    def test_functions_broadcast(self):
+        from pyspark.sql.functions import broadcast
+
+        df1 = self.sqlCtx.createDataFrame([(1, "1"), (2, "2")], ("key", "value"))
+        df2 = self.sqlCtx.createDataFrame([(1, "1"), (2, "2")], ("key", "value"))
+
+        # equijoin - should be converted into broadcast join
+        plan1 = df1.join(broadcast(df2), "key")._jdf.queryExecution().executedPlan()
+        self.assertEqual(1, plan1.toString().count("BroadcastHashJoin"))
+
+        # no join key -- should not be a broadcast join
+        plan2 = df1.join(broadcast(df2))._jdf.queryExecution().executedPlan()
+        self.assertEqual(0, plan2.toString().count("BroadcastHashJoin"))
+
+        # planner should not crash without a join
+        broadcast(df1)._jdf.queryExecution().executedPlan()
+
 
 class HiveContextSQLTests(ReusedPySparkTestCase):
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index aacfbf9af760c..47db44880f531 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -825,6 +825,10 @@ class Analyzer(
             val withName = Alias(agg, s"_w${extractedExprBuffer.length}")()
             extractedExprBuffer += withName
             withName.toAttribute
+
+          // Extracts other attributes
+          case attr: Attribute => extractExpr(attr)
+
         }.asInstanceOf[NamedExpression]
       }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index f024db7ae2cdd..3fce47fa655d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1625,7 +1625,7 @@ class DataFrame private[sql](
    */
   @deprecated("Use write.jdbc()", "1.4.0")
   def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit = {
-    val w = if (overwrite) write.mode(SaveMode.Overwrite) else write
+    val w = if (overwrite) write.mode(SaveMode.Overwrite) else write.mode(SaveMode.Append)
     w.jdbc(url, table, new Properties)
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index 7e204ec88aa1f..55035f4bc5f2a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -145,13 +145,10 @@ case class Window(
         // Construct the ordering. This is used to compare the result of current value projection
         // to the result of bound value projection. This is done manually because we want to use
         // Code Generation (if it is enabled).
-        val (sortExprs, schema) = exprs.map { case e =>
-          // This AttributeReference does not need to have unique IDs, it's OK to be called
-          // in executor.
-          val ref = AttributeReference("ordExpr", e.dataType, e.nullable)()
-          (SortOrder(ref, e.direction), ref)
-        }.unzip
-        val ordering = newOrdering(sortExprs, schema)
+        val sortExprs = exprs.zipWithIndex.map { case (e, i) =>
+          SortOrder(BoundReference(i, e.dataType, e.nullable), e.direction)
+        }
+        val ordering = newOrdering(sortExprs, Nil)
         RangeBoundOrdering(ordering, current, bound)
       case RowFrame => RowBoundOrdering(offset)
     }
@@ -203,17 +200,19 @@ case class Window(
    * This method uses Code Generation. It can only be used on the executor side.
    *
    * @param expressions unbound ordered function expressions.
-   * @param attributes output attributes
    * @return the final resulting projection.
    */
   private[this] def createResultProjection(
-      expressions: Seq[Expression],
-      attributes: Seq[Attribute]): MutableProjection = {
-    val unboundToAttrMap = expressions.zip(attributes).toMap
-    val patchedWindowExpression = windowExpression.map(_.transform(unboundToAttrMap))
+      expressions: Seq[Expression]): MutableProjection = {
+    val references = expressions.zipWithIndex.map{ case (e, i) =>
+      // Results of window expressions will be on the right side of child's output
+      BoundReference(child.output.size + i, e.dataType, e.nullable)
+    }
+    val unboundToRefMap = expressions.zip(references).toMap
+    val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap))
     newMutableProjection(
       projectList ++ patchedWindowExpression,
-      child.output ++ attributes)()
+      child.output)()
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
@@ -248,17 +247,12 @@ case class Window(
         factories(index) = () => createFrameProcessor(frame, functions, ordinal)
     }
 
-    // AttributeReference can only be created in driver, or the id will not be unique
-    val outputAttributes = unboundExpressions.map {
-      e => AttributeReference("windowResult", e.dataType, e.nullable)()
-    }
-
     // Start processing.
     child.execute().mapPartitions { stream =>
       new Iterator[InternalRow] {
 
         // Get all relevant projections.
-        val result = createResultProjection(unboundExpressions, outputAttributes)
+        val result = createResultProjection(unboundExpressions)
         val grouping = if (child.outputsUnsafeRows) {
           UnsafeProjection.create(partitionSpec, child.output)
         } else {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index eb5bffc18f846..223a2fbfbe020 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1756,4 +1756,14 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
         df1.withColumn("diff", lit(0)))
     }
   }
+
+  test("SPARK-10389: order by non-attribute grouping expression on Aggregate") {
+    withTempTable("src") {
+      Seq((1, 1), (-1, 1)).toDF("key", "value").registerTempTable("src")
+      checkAnswer(sql("SELECT MAX(value) FROM src GROUP BY key + 1 ORDER BY key + 1"),
+        Seq(Row(1), Row(1)))
+      checkAnswer(sql("SELECT MAX(value) FROM src GROUP BY key + 1 ORDER BY (key + 1) * 2"),
+        Seq(Row(1), Row(1)))
+    }
+  }
 }
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index a0643cec0fb7c..a4fd0c3ce9702 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -55,7 +55,6 @@ object HiveThriftServer2 extends Logging {
   @DeveloperApi
   def startWithContext(sqlContext: HiveContext): Unit = {
     val server = new HiveThriftServer2(sqlContext)
-    sqlContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
     server.init(sqlContext.hiveconf)
     server.start()
     listener = new HiveThriftServer2Listener(server, sqlContext.conf)
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index b72249b3bf8c0..19b2f24456ab0 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -21,6 +21,7 @@ import java.io.File
 import java.net.URL
 import java.sql.{Date, DriverManager, SQLException, Statement}
 
+import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.duration._
@@ -431,6 +432,32 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
       }
     )
   }
+
+  test("Checks Hive version via SET -v") {
+    withJdbcStatement { statement =>
+      val resultSet = statement.executeQuery("SET -v")
+
+      val conf = mutable.Map.empty[String, String]
+      while (resultSet.next()) {
+        conf += resultSet.getString(1) -> resultSet.getString(2)
+      }
+
+      assert(conf.get("spark.sql.hive.version") === Some("1.2.1"))
+    }
+  }
+
+  test("Checks Hive version via SET") {
+    withJdbcStatement { statement =>
+      val resultSet = statement.executeQuery("SET")
+
+      val conf = mutable.Map.empty[String, String]
+      while (resultSet.next()) {
+        conf += resultSet.getString(1) -> resultSet.getString(2)
+      }
+
+      assert(conf.get("spark.sql.hive.version") === Some("1.2.1"))
+    }
+  }
 }
 
 class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 7af84057cee3b..8a6236ce018ff 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -648,6 +648,11 @@ private[hive] object HiveContext {
     doc = "Version of the Hive metastore. Available options are " +
         s"0.12.0 through $hiveExecutionVersion.")
 
+  val HIVE_EXECUTION_VERSION = stringConf(
+    key = "spark.sql.hive.version",
+    defaultValue = Some(hiveExecutionVersion),
+    doc = "Version of Hive used internally by Spark SQL.")
+
   val HIVE_METASTORE_JARS = stringConf("spark.sql.hive.metastore.jars",
     defaultValue = Some("builtin"),
     doc = s"""
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 1d5ee22e99e0a..788118b6e3082 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -29,7 +29,8 @@ import org.scalatest.exceptions.TestFailedDueToTimeoutException
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
-import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.{SQLContext, QueryTest}
+import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
 import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
 import org.apache.spark.sql.types.DecimalType
@@ -107,6 +108,16 @@ class HiveSparkSubmitSuite
     runSparkSubmit(args)
   }
 
+  test("SPARK-11009 fix wrong result of Window function in cluster mode") {
+    val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+    val args = Seq(
+      "--class", SPARK_11009.getClass.getName.stripSuffix("$"),
+      "--name", "SparkSQLConfTest",
+      "--master", "local-cluster[2,1,1024]",
+      unusedJar.toString)
+    runSparkSubmit(args)
+  }
+
   // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
   // This is copied from org.apache.spark.deploy.SparkSubmitSuite
   private def runSparkSubmit(args: Seq[String]): Unit = {
@@ -314,3 +325,33 @@ object SPARK_9757 extends QueryTest with Logging {
     }
   }
 }
+
+object SPARK_11009 extends QueryTest {
+  import org.apache.spark.sql.functions._
+
+  protected var sqlContext: SQLContext = _
+
+  def main(args: Array[String]): Unit = {
+    Utils.configTestLog4j("INFO")
+
+    val sparkContext = new SparkContext(
+      new SparkConf()
+        .set("spark.ui.enabled", "false")
+        .set("spark.sql.shuffle.partitions", "100"))
+
+    val hiveContext = new TestHiveContext(sparkContext)
+    sqlContext = hiveContext
+
+    try {
+      val df = sqlContext.range(1 << 20)
+      val df2 = df.select((df("id") % 1000).alias("A"), (df("id") / 1000).alias("B"))
+      val ws = Window.partitionBy(df2("A")).orderBy(df2("B"))
+      val df3 = df2.select(df2("A"), df2("B"), rowNumber().over(ws).alias("rn")).filter("rn < 0")
+      if (df3.rdd.count() != 0) {
+        throw new Exception("df3 should have 0 output row.")
+      }
+    } finally {
+      sparkContext.stop()
+    }
+  }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 72038f70df070..c98fddb3cb47b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -835,6 +835,33 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils {
       ).map(i => Row(i._1, i._2, i._3)))
   }
 
+  test("window function: refer column in inner select block") {
+    val data = Seq(
+      WindowData(1, "a", 5),
+      WindowData(2, "a", 6),
+      WindowData(3, "b", 7),
+      WindowData(4, "b", 8),
+      WindowData(5, "c", 9),
+      WindowData(6, "c", 10)
+    )
+    sparkContext.parallelize(data).toDF().registerTempTable("windowData")
+
+    checkAnswer(
+      sql(
+        """
+          |select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1
+          |from (select month, area, product, 1 as tmp1 from windowData) tmp
+        """.stripMargin),
+      Seq(
+        ("a", 2),
+        ("a", 3),
+        ("b", 2),
+        ("b", 3),
+        ("c", 2),
+        ("c", 3)
+      ).map(i => Row(i._1, i._2)))
+  }
+
   test("window function: partition and order expressions") {
     val data = Seq(
       WindowData(1, "a", 5),
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 1b717b64542d5..a19b85a51d289 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -443,7 +443,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
   }
 
   def generateInputMetadataTable(inputMetadatas: Seq[(Int, String)]): Seq[Node] = {
-    
+    
       
         
           | Input | 
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 28228c2656a5c..f21f5efbfb196 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -318,7 +318,8 @@ private[spark] class Client(
         destName: Option[String] = None,
         targetDir: Option[String] = None,
         appMasterOnly: Boolean = false): (Boolean, String) = {
-      val localURI = new URI(path.trim())
+      val trimmedPath = path.trim()
+      val localURI = Utils.resolveURI(trimmedPath)
       if (localURI.getScheme != LOCAL_SCHEME) {
         if (addDistributedUri(localURI)) {
           val localPath = getQualifiedLocalPath(localURI, hadoopConf)
@@ -334,7 +335,7 @@ private[spark] class Client(
           (false, null)
         }
       } else {
-        (true, path.trim())
+        (true, trimmedPath)
       }
     }
 
@@ -555,10 +556,10 @@ private[spark] class Client(
         LOCALIZED_PYTHON_DIR)
     }
     (pySparkArchives ++ pyArchives).foreach { path =>
-      val uri = new URI(path)
+      val uri = Utils.resolveURI(path)
       if (uri.getScheme != LOCAL_SCHEME) {
         pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
-          new Path(path).getName())
+          new Path(uri).getName())
       } else {
         pythonPath += uri.getPath()
       }
@@ -1143,7 +1144,7 @@ object Client extends Logging {
         } else {
           getMainJarUri(sparkConf.getOption(CONF_SPARK_USER_JAR))
         }
-      mainJar.foreach(addFileToClasspath(sparkConf, _, APP_JAR, env))
+      mainJar.foreach(addFileToClasspath(sparkConf, conf, _, APP_JAR, env))
 
       val secondaryJars =
         if (args != null) {
@@ -1152,10 +1153,10 @@ object Client extends Logging {
           getSecondaryJarUris(sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS))
         }
       secondaryJars.foreach { x =>
-        addFileToClasspath(sparkConf, x, null, env)
+        addFileToClasspath(sparkConf, conf, x, null, env)
       }
     }
-    addFileToClasspath(sparkConf, new URI(sparkJar(sparkConf)), SPARK_JAR, env)
+    addFileToClasspath(sparkConf, conf, new URI(sparkJar(sparkConf)), SPARK_JAR, env)
     populateHadoopClasspath(conf, env)
     sys.env.get(ENV_DIST_CLASSPATH).foreach { cp =>
       addClasspathEntry(getClusterPath(sparkConf, cp), env)
@@ -1175,7 +1176,7 @@ object Client extends Logging {
 
   private def getMainJarUri(mainJar: Option[String]): Option[URI] = {
     mainJar.flatMap { path =>
-      val uri = new URI(path)
+      val uri = Utils.resolveURI(path)
       if (uri.getScheme == LOCAL_SCHEME) Some(uri) else None
     }.orElse(Some(new URI(APP_JAR)))
   }
@@ -1190,15 +1191,17 @@ object Client extends Logging {
    * If an alternate name for the file is given, and it's not a "local:" file, the alternate
    * name will be added to the classpath (relative to the job's work directory).
    *
-   * If not a "local:" file and no alternate name, the environment is not modified.
+   * If not a "local:" file and no alternate name, the linkName will be added to the classpath.
    *
-   * @param conf      Spark configuration.
-   * @param uri       URI to add to classpath (optional).
-   * @param fileName  Alternate name for the file (optional).
-   * @param env       Map holding the environment variables.
+   * @param conf        Spark configuration.
+   * @param hadoopConf  Hadoop configuration.
+   * @param uri         URI to add to classpath (optional).
+   * @param fileName    Alternate name for the file (optional).
+   * @param env         Map holding the environment variables.
    */
   private def addFileToClasspath(
       conf: SparkConf,
+      hadoopConf: Configuration,
       uri: URI,
       fileName: String,
       env: HashMap[String, String]): Unit = {
@@ -1207,6 +1210,11 @@ object Client extends Logging {
     } else if (fileName != null) {
       addClasspathEntry(buildPath(
         YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env)
+    } else if (uri != null) {
+      val localPath = getQualifiedLocalPath(uri, hadoopConf)
+      val linkName = Option(uri.getFragment()).getOrElse(localPath.getName())
+      addClasspathEntry(buildPath(
+        YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), linkName), env)
     }
   }