diff --git a/R/pkg/R/client.R b/R/pkg/R/client.R index 78c7a3037ffac..a294fc4fbc75f 100644 --- a/R/pkg/R/client.R +++ b/R/pkg/R/client.R @@ -48,7 +48,7 @@ generateSparkSubmitArgs <- function(args, sparkHome, jars, sparkSubmitOpts, pack jars <- paste("--jars", jars) } - if (packages != "") { + if (!identical(packages, "")) { packages <- paste("--packages", packages) } diff --git a/R/pkg/inst/tests/test_client.R b/R/pkg/inst/tests/test_client.R index 30b05c1a2afcd..8a20991f89af8 100644 --- a/R/pkg/inst/tests/test_client.R +++ b/R/pkg/inst/tests/test_client.R @@ -30,3 +30,7 @@ test_that("no package specified doesn't add packages flag", { expect_equal(gsub("[[:space:]]", "", args), "") }) + +test_that("multiple packages don't produce a warning", { + expect_that(generateSparkSubmitArgs("", "", "", "", c("A", "B")), not(gives_warning())) +}) diff --git a/build/sbt-launch-lib.bash b/build/sbt-launch-lib.bash index 504be48b358fa..7930a38b9674a 100755 --- a/build/sbt-launch-lib.bash +++ b/build/sbt-launch-lib.bash @@ -51,9 +51,13 @@ acquire_sbt_jar () { printf "Attempting to fetch sbt\n" JAR_DL="${JAR}.part" if [ $(command -v curl) ]; then - (curl --silent ${URL1} > "${JAR_DL}" || curl --silent ${URL2} > "${JAR_DL}") && mv "${JAR_DL}" "${JAR}" + (curl --fail --location --silent ${URL1} > "${JAR_DL}" ||\ + (rm -f "${JAR_DL}" && curl --fail --location --silent ${URL2} > "${JAR_DL}")) &&\ + mv "${JAR_DL}" "${JAR}" elif [ $(command -v wget) ]; then - (wget --quiet ${URL1} -O "${JAR_DL}" || wget --quiet ${URL2} -O "${JAR_DL}") && mv "${JAR_DL}" "${JAR}" + (wget --quiet ${URL1} -O "${JAR_DL}" ||\ + (rm -f "${JAR_DL}" && wget --quiet ${URL2} -O "${JAR_DL}")) &&\ + mv "${JAR_DL}" "${JAR}" else printf "You do not have curl or wget installed, please install sbt manually from http://www.scala-sbt.org/\n" exit -1 diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 82889bcd30988..6c8c5f68e2ac0 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -56,7 +56,7 @@ object Partitioner { */ def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse - for (r <- bySize if r.partitioner.isDefined) { + for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) { return r.partitioner.get } if (rdd.context.conf.contains("spark.default.parallelism")) { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index fccceb3ea528b..821c4304b5889 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -532,50 +532,24 @@ private[master] class Master( } /** - * Schedule executors to be launched on the workers. - * - * There are two modes of launching executors. The first attempts to spread out an application's - * executors on as many workers as possible, while the second does the opposite (i.e. launch them - * on as few workers as possible). The former is usually better for data locality purposes and is - * the default. - * - * The number of cores assigned to each executor is configurable. When this is explicitly set, - * multiple executors from the same application may be launched on the same worker if the worker - * has enough cores and memory. Otherwise, each executor grabs all the cores available on the - * worker by default, in which case only one executor may be launched on each worker. + * Schedule and launch executors on workers */ private def startExecutorsOnWorkers(): Unit = { // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. - if (spreadOutApps) { - // Try to spread out each app among all the workers, until it has all its cores - for (app <- waitingApps if app.coresLeft > 0) { - val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && - worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1)) - .sortBy(_.coresFree).reverse - val numUsable = usableWorkers.length - val assigned = new Array[Int](numUsable) // Number of cores to give on each node - var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) - var pos = 0 - while (toAssign > 0) { - if (usableWorkers(pos).coresFree - assigned(pos) > 0) { - toAssign -= 1 - assigned(pos) += 1 - } - pos = (pos + 1) % numUsable - } - // Now that we've decided how many cores to give on each node, let's actually give them - for (pos <- 0 until numUsable if assigned(pos) > 0) { - allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos)) - } - } - } else { - // Pack each app into as few workers as possible until we've assigned all its cores - for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { - for (app <- waitingApps if app.coresLeft > 0) { - allocateWorkerResourceToExecutors(app, app.coresLeft, worker) - } + for (app <- waitingApps if app.coresLeft > 0) { + val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor + // Filter out workers that don't have enough resources to launch an executor + val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) + .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && + worker.coresFree >= coresPerExecutor.getOrElse(1)) + .sortBy(_.coresFree).reverse + val assignedCores = Master.scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) + + // Now that we've decided how many cores to allocate on each worker, let's allocate them + for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { + allocateWorkerResourceToExecutors( + app, assignedCores(pos), coresPerExecutor, usableWorkers(pos)) } } } @@ -583,19 +557,22 @@ private[master] class Master( /** * Allocate a worker's resources to one or more executors. * @param app the info of the application which the executors belong to - * @param coresToAllocate cores on this worker to be allocated to this application + * @param assignedCores number of cores on this worker for this application + * @param coresPerExecutor number of cores per executor * @param worker the worker info */ private def allocateWorkerResourceToExecutors( app: ApplicationInfo, - coresToAllocate: Int, + assignedCores: Int, + coresPerExecutor: Option[Int], worker: WorkerInfo): Unit = { - val memoryPerExecutor = app.desc.memoryPerExecutorMB - val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate) - var coresLeft = coresToAllocate - while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) { - val exec = app.addExecutor(worker, coresPerExecutor) - coresLeft -= coresPerExecutor + // If the number of cores per executor is specified, we divide the cores assigned + // to this worker evenly among the executors with no remainder. + // Otherwise, we launch a single executor that grabs all the assignedCores on this worker. + val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1) + val coresToAssign = coresPerExecutor.getOrElse(assignedCores) + for (i <- 1 to numExecutors) { + val exec = app.addExecutor(worker, coresToAssign) launchExecutor(worker, exec) app.state = ApplicationState.RUNNING } @@ -892,7 +869,7 @@ private[master] class Master( private[deploy] object Master extends Logging { val systemName = "sparkMaster" - private val actorName = "Master" + val actorName = "Master" def main(argStrings: Array[String]) { SignalLogger.register(log) @@ -944,4 +921,74 @@ private[deploy] object Master extends Logging { val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse] (actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort) } + + + /** + * Schedule executors to be launched on the workers. + * Returns an array containing number of cores assigned to each worker. + * + * There are two modes of launching executors. The first attempts to spread out an application's + * executors on as many workers as possible, while the second does the opposite (i.e. launch them + * on as few workers as possible). The former is usually better for data locality purposes and is + * the default. + * + * The number of cores assigned to each executor is configurable. When this is explicitly set, + * multiple executors from the same application may be launched on the same worker if the worker + * has enough cores and memory. Otherwise, each executor grabs all the cores available on the + * worker by default, in which case only one executor may be launched on each worker. + * + * It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core + * at a time). Consider the following example: cluster has 4 workers with 16 cores each. + * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is + * allocated at a time, 12 cores from each worker would be assigned to each executor. + * Since 12 < 16, no executors would launch [SPARK-8881]. + * + * Unfortunately, this must be moved out here into the Master object because Akka allows + * neither creating actors outside of Props nor accessing the Master after setting up the + * actor system. Otherwise, there is no way to test it. + */ + def scheduleExecutorsOnWorkers( + app: ApplicationInfo, + usableWorkers: Array[WorkerInfo], + spreadOutApps: Boolean): Array[Int] = { + // If the number of cores per executor is not specified, then we can just schedule + // 1 core at a time since we expect a single executor to be launched on each worker + val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1) + val memoryPerExecutor = app.desc.memoryPerExecutorMB + val numUsable = usableWorkers.length + val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker + val assignedMemory = new Array[Int](numUsable) // Amount of memory to give to each worker + var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) + var freeWorkers = (0 until numUsable).toIndexedSeq + + def canLaunchExecutor(pos: Int): Boolean = { + usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor && + usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor + } + + while (coresToAssign >= coresPerExecutor && freeWorkers.nonEmpty) { + freeWorkers = freeWorkers.filter(canLaunchExecutor) + freeWorkers.foreach { pos => + var keepScheduling = true + while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= coresPerExecutor) { + coresToAssign -= coresPerExecutor + assignedCores(pos) += coresPerExecutor + // If cores per executor is not set, we are assigning 1 core at a time + // without actually meaning to launch 1 executor for each core assigned + if (app.desc.coresPerExecutor.isDefined) { + assignedMemory(pos) += memoryPerExecutor + } + + // Spreading out an application means spreading out its executors across as + // many workers as possible. If we are not spreading out, then we should keep + // scheduling executors on this worker until we use all of its resources. + // Otherwise, just move on to the next worker. + if (spreadOutApps) { + keepScheduling = false + } + } + } + } + assignedCores + } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5f132410540fd..1e1872e5788a6 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -430,11 +430,11 @@ private[spark] object Utils extends Logging { val lockFileName = s"${url.hashCode}${timestamp}_lock" val localDir = new File(getLocalDir(conf)) val lockFile = new File(localDir, lockFileName) - val raf = new RandomAccessFile(lockFile, "rw") + val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel() // Only one executor entry. // The FileLock is only used to control synchronization for executors download file, // it's always safe regardless of lock type (mandatory or advisory). - val lock = raf.getChannel().lock() + val lock = lockFileChannel.lock() val cachedFile = new File(localDir, cachedFileName) try { if (!cachedFile.exists()) { @@ -442,6 +442,7 @@ private[spark] object Utils extends Logging { } } finally { lock.release() + lockFileChannel.close() } copyFile( url, diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 014e87bb40254..3eabc1aa1cf2d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -184,4 +184,174 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually { } } + test("basic scheduling - spread out") { + testBasicScheduling(spreadOut = true) + } + + test("basic scheduling - no spread out") { + testBasicScheduling(spreadOut = false) + } + + test("scheduling with max cores - spread out") { + testSchedulingWithMaxCores(spreadOut = true) + } + + test("scheduling with max cores - no spread out") { + testSchedulingWithMaxCores(spreadOut = false) + } + + test("scheduling with cores per executor - spread out") { + testSchedulingWithCoresPerExecutor(spreadOut = true) + } + + test("scheduling with cores per executor - no spread out") { + testSchedulingWithCoresPerExecutor(spreadOut = false) + } + + test("scheduling with cores per executor AND max cores - spread out") { + testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = true) + } + + test("scheduling with cores per executor AND max cores - no spread out") { + testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = false) + } + + private def testBasicScheduling(spreadOut: Boolean): Unit = { + val appInfo = makeAppInfo(1024) + val workerInfo = makeWorkerInfo(4096, 10) + val workerInfos = Array(workerInfo, workerInfo, workerInfo) + val scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut) + assert(scheduledCores.length === 3) + assert(scheduledCores(0) === 10) + assert(scheduledCores(1) === 10) + assert(scheduledCores(2) === 10) + } + + private def testSchedulingWithMaxCores(spreadOut: Boolean): Unit = { + val appInfo1 = makeAppInfo(1024, maxCores = Some(8)) + val appInfo2 = makeAppInfo(1024, maxCores = Some(16)) + val workerInfo = makeWorkerInfo(4096, 10) + val workerInfos = Array(workerInfo, workerInfo, workerInfo) + var scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut) + assert(scheduledCores.length === 3) + // With spreading out, each worker should be assigned a few cores + if (spreadOut) { + assert(scheduledCores(0) === 3) + assert(scheduledCores(1) === 3) + assert(scheduledCores(2) === 2) + } else { + // Without spreading out, the cores should be concentrated on the first worker + assert(scheduledCores(0) === 8) + assert(scheduledCores(1) === 0) + assert(scheduledCores(2) === 0) + } + // Now test the same thing with max cores > cores per worker + scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut) + assert(scheduledCores.length === 3) + if (spreadOut) { + assert(scheduledCores(0) === 6) + assert(scheduledCores(1) === 5) + assert(scheduledCores(2) === 5) + } else { + // Without spreading out, the first worker should be fully booked, + // and the leftover cores should spill over to the second worker only. + assert(scheduledCores(0) === 10) + assert(scheduledCores(1) === 6) + assert(scheduledCores(2) === 0) + } + } + + private def testSchedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = { + val appInfo1 = makeAppInfo(1024, coresPerExecutor = Some(2)) + val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2)) + val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3)) + val workerInfo = makeWorkerInfo(4096, 10) + val workerInfos = Array(workerInfo, workerInfo, workerInfo) + // Each worker should end up with 4 executors with 2 cores each + // This should be 4 because of the memory restriction on each worker + var scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut) + assert(scheduledCores.length === 3) + assert(scheduledCores(0) === 8) + assert(scheduledCores(1) === 8) + assert(scheduledCores(2) === 8) + // Now test the same thing without running into the worker memory limit + // Each worker should now end up with 5 executors with 2 cores each + scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut) + assert(scheduledCores.length === 3) + assert(scheduledCores(0) === 10) + assert(scheduledCores(1) === 10) + assert(scheduledCores(2) === 10) + // Now test the same thing with a cores per executor that 10 is not divisible by + scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut) + assert(scheduledCores.length === 3) + assert(scheduledCores(0) === 9) + assert(scheduledCores(1) === 9) + assert(scheduledCores(2) === 9) + } + + // Sorry for the long method name! + private def testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = { + val appInfo1 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(4)) + val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(20)) + val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3), maxCores = Some(20)) + val workerInfo = makeWorkerInfo(4096, 10) + val workerInfos = Array(workerInfo, workerInfo, workerInfo) + // We should only launch two executors, each with exactly 2 cores + var scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut) + assert(scheduledCores.length === 3) + if (spreadOut) { + assert(scheduledCores(0) === 2) + assert(scheduledCores(1) === 2) + assert(scheduledCores(2) === 0) + } else { + assert(scheduledCores(0) === 4) + assert(scheduledCores(1) === 0) + assert(scheduledCores(2) === 0) + } + // Test max cores > number of cores per worker + scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut) + assert(scheduledCores.length === 3) + if (spreadOut) { + assert(scheduledCores(0) === 8) + assert(scheduledCores(1) === 6) + assert(scheduledCores(2) === 6) + } else { + assert(scheduledCores(0) === 10) + assert(scheduledCores(1) === 10) + assert(scheduledCores(2) === 0) + } + // Test max cores > number of cores per worker AND + // a cores per executor that is 10 is not divisible by + scheduledCores = Master.scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut) + assert(scheduledCores.length === 3) + if (spreadOut) { + assert(scheduledCores(0) === 6) + assert(scheduledCores(1) === 6) + assert(scheduledCores(2) === 6) + } else { + assert(scheduledCores(0) === 9) + assert(scheduledCores(1) === 9) + assert(scheduledCores(2) === 0) + } + } + + // =============================== + // | Utility methods for testing | + // =============================== + + private def makeAppInfo( + memoryPerExecutorMb: Int, + coresPerExecutor: Option[Int] = None, + maxCores: Option[Int] = None): ApplicationInfo = { + val desc = new ApplicationDescription( + "test", maxCores, memoryPerExecutorMb, null, "", None, None, coresPerExecutor) + val appId = System.currentTimeMillis.toString + new ApplicationInfo(0, appId, desc, new Date, null, Int.MaxValue) + } + + private def makeWorkerInfo(memoryMb: Int, cores: Int): WorkerInfo = { + val workerId = System.currentTimeMillis.toString + new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, 101, "address") + } + } diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index dfa102f432a02..1321ec84735b5 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -282,6 +282,29 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { )) } + // See SPARK-9326 + test("cogroup with empty RDD") { + import scala.reflect.classTag + val intPairCT = classTag[(Int, Int)] + + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.emptyRDD[(Int, Int)](intPairCT) + + val joined = rdd1.cogroup(rdd2).collect() + assert(joined.size > 0) + } + + // See SPARK-9326 + test("cogroup with groupByed RDD having 0 partitions") { + import scala.reflect.classTag + val intCT = classTag[Int] + + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.emptyRDD[Int](intCT).groupBy((x) => 5) + val joined = rdd1.cogroup(rdd2).collect() + assert(joined.size > 0) + } + test("rightOuterJoin") { val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index 3f10cb2dc3d2a..99f8c827f767f 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -800,7 +800,7 @@ import org.apache.spark.graphx._ // Import random graph generation library import org.apache.spark.graphx.util.GraphGenerators // A graph with edge attributes containing distances -val graph: Graph[Int, Double] = +val graph: Graph[Long, Double] = GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble) val sourceId: VertexId = 42 // The ultimate source // Initialize the graph such that all vertices except the root have distance infinity. diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 07b30bfb87e6b..5290b21ad2e43 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -68,9 +68,9 @@ In YARN terminology, executors and application masters run inside "containers". yarn logs -applicationId -will print out the contents of all log files from all containers from the given application. You can also view the container log files directly in HDFS using the HDFS shell or API. The directory where they are located can be found by looking at your YARN configs (`yarn.nodemanager.remote-app-log-dir` and `yarn.nodemanager.remote-app-log-dir-suffix`). +will print out the contents of all log files from all containers from the given application. You can also view the container log files directly in HDFS using the HDFS shell or API. The directory where they are located can be found by looking at your YARN configs (`yarn.nodemanager.remote-app-log-dir` and `yarn.nodemanager.remote-app-log-dir-suffix`). The logs are also available on the Spark Web UI under the Executors Tab. You need to have both the Spark history server and the MapReduce history server running and configure `yarn.log.server.url` in `yarn-site.xml` properly. The log URL on the Spark history server UI will redirect you to the MapReduce history server to show the aggregated logs. -When log aggregation isn't turned on, logs are retained locally on each machine under `YARN_APP_LOGS_DIR`, which is usually configured to `/tmp/logs` or `$HADOOP_HOME/logs/userlogs` depending on the Hadoop version and installation. Viewing logs for a container requires going to the host that contains them and looking in this directory. Subdirectories organize log files by application ID and container ID. +When log aggregation isn't turned on, logs are retained locally on each machine under `YARN_APP_LOGS_DIR`, which is usually configured to `/tmp/logs` or `$HADOOP_HOME/logs/userlogs` depending on the Hadoop version and installation. Viewing logs for a container requires going to the host that contains them and looking in this directory. Subdirectories organize log files by application ID and container ID. The logs are also available on the Spark Web UI under the Executors Tab and doesn't require running the MapReduce history server. To review per-container launch environment, increase `yarn.nodemanager.delete.debug-delay-sec` to a large value (e.g. 36000), and then access the application cache through `yarn.nodemanager.local-dirs` diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala index e17743a9f0cf4..6bbff81200f0f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala @@ -196,7 +196,7 @@ object UTF8String { 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 5, 5, - 6, 6, 6, 6) + 6, 6) /** * Create a UTF-8 String from String diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index a5ca3613c5e00..ed6685f9f1f23 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -1337,11 +1337,11 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C /* Attribute References */ case Token("TOK_TABLE_OR_COL", Token(name, Nil) :: Nil) => - UnresolvedAttribute(cleanIdentifier(name)) + UnresolvedAttribute.quoted(cleanIdentifier(name)) case Token(".", qualifier :: Token(attr, Nil) :: Nil) => nodeToExpr(qualifier) match { - case UnresolvedAttribute(qualifierName) => - UnresolvedAttribute(qualifierName :+ cleanIdentifier(attr)) + case UnresolvedAttribute(nameParts) => + UnresolvedAttribute(nameParts :+ cleanIdentifier(attr)) case other => UnresolvedExtractValue(other, Literal(attr)) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index e302f8b249356..13054ad3563c2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -938,4 +938,12 @@ class SQLQuerySuite extends QueryTest { case None => // OK } } + + test("SPARK-9371: fix the support for special chars in column names for hive context") { + TestHive.read.json(TestHive.sparkContext.makeRDD( + """{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""" :: Nil)) + .registerTempTable("t") + + checkAnswer(sql("SELECT a.`c.b`, `b.$q`[0].`a@!.q`, `q.w`.`w.i&`[0] FROM t"), Row(1, 1, 1)) + } }