Skip to content

Commit 744788f

Browse files
author
Davies Liu
committed
Merge branch 'master' of github.com:apache/spark into string
Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
2 parents b04a19c + c5b0b29 commit 744788f

File tree

194 files changed

+5130
-1549
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

194 files changed

+5130
-1549
lines changed

R/pkg/inst/tests/test_binaryFile.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ test_that("saveAsObjectFile()/objectFile() following textFile() works", {
2727
fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
2828
writeLines(mockFile, fileName1)
2929

30-
rdd <- textFile(sc, fileName1)
30+
rdd <- textFile(sc, fileName1, 1)
3131
saveAsObjectFile(rdd, fileName2)
3232
rdd <- objectFile(sc, fileName2)
3333
expect_equal(collect(rdd), as.list(mockFile))
@@ -40,7 +40,7 @@ test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
4040
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
4141

4242
l <- list(1, 2, 3)
43-
rdd <- parallelize(sc, l)
43+
rdd <- parallelize(sc, l, 1)
4444
saveAsObjectFile(rdd, fileName)
4545
rdd <- objectFile(sc, fileName)
4646
expect_equal(collect(rdd), l)

R/pkg/inst/tests/test_textFile.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ test_that("textFile() followed by a saveAsTextFile() returns the same content",
8181
fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
8282
writeLines(mockFile, fileName1)
8383

84-
rdd <- textFile(sc, fileName1)
84+
rdd <- textFile(sc, fileName1, 1L)
8585
saveAsTextFile(rdd, fileName2)
8686
rdd <- textFile(sc, fileName2)
8787
expect_equal(collect(rdd), as.list(mockFile))
@@ -93,7 +93,7 @@ test_that("textFile() followed by a saveAsTextFile() returns the same content",
9393
test_that("saveAsTextFile() on a parallelized list works as expected", {
9494
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
9595
l <- list(1, 2, 3)
96-
rdd <- parallelize(sc, l)
96+
rdd <- parallelize(sc, l, 1L)
9797
saveAsTextFile(rdd, fileName)
9898
rdd <- textFile(sc, fileName)
9999
expect_equal(collect(rdd), lapply(l, function(x) {toString(x)}))

core/src/main/scala/org/apache/spark/HttpServer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ private[spark] class HttpServer(
160160
throw new ServerStateException("Server is not started")
161161
} else {
162162
val scheme = if (securityManager.fileServerSSLOptions.enabled) "https" else "http"
163-
s"$scheme://${Utils.localIpAddress}:$port"
163+
s"$scheme://${Utils.localHostNameForURI()}:$port"
164164
}
165165
}
166166
}

core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class LocalSparkCluster(
5353
/* Start the Master */
5454
val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, 0, 0, _conf)
5555
masterActorSystems += masterSystem
56-
val masterUrl = "spark://" + localHostname + ":" + masterPort
56+
val masterUrl = "spark://" + Utils.localHostNameForURI() + ":" + masterPort
5757
val masters = Array(masterUrl)
5858

5959
/* Start the Workers */

core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ private[spark] object TestClient {
4646
def main(args: Array[String]) {
4747
val url = args(0)
4848
val conf = new SparkConf
49-
val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
49+
val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localHostName(), 0,
5050
conf = conf, securityManager = new SecurityManager(conf))
5151
val desc = new ApplicationDescription("TestClient", Some(1), 512,
5252
Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored")

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ private[deploy] class ExecutorRunner(
5050
val workerUrl: String,
5151
conf: SparkConf,
5252
val appLocalDirs: Seq[String],
53-
var state: ExecutorState.Value)
53+
@volatile var state: ExecutorState.Value)
5454
extends Logging {
5555

5656
private val fullId = appId + "/" + execId

core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.shuffle.ShuffleWriter
3333
* See [[org.apache.spark.scheduler.Task]] for more information.
3434
*
3535
* @param stageId id of the stage this task belongs to
36-
* @param taskBinary broadcast version of of the RDD and the ShuffleDependency. Once deserialized,
36+
* @param taskBinary broadcast version of the RDD and the ShuffleDependency. Once deserialized,
3737
* the type should be (RDD[_], ShuffleDependency[_, _, _]).
3838
* @param partition partition of the RDD this task is associated with
3939
* @param locs preferred task execution locations for locality scheduling

core/src/main/scala/org/apache/spark/ui/WebUI.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ private[spark] abstract class WebUI(
4848
protected val handlers = ArrayBuffer[ServletContextHandler]()
4949
protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]]
5050
protected var serverInfo: Option[ServerInfo] = None
51-
protected val localHostName = Utils.localHostName()
51+
protected val localHostName = Utils.localHostNameForURI()
5252
protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
5353
private val className = Utils.getFormattedClassName(this)
5454

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import scala.util.Try
3434
import scala.util.control.{ControlThrowable, NonFatal}
3535

3636
import com.google.common.io.{ByteStreams, Files}
37+
import com.google.common.net.InetAddresses
3738
import com.google.common.util.concurrent.ThreadFactoryBuilder
3839
import org.apache.commons.lang3.SystemUtils
3940
import org.apache.hadoop.conf.Configuration
@@ -789,13 +790,12 @@ private[spark] object Utils extends Logging {
789790
* Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4).
790791
* Note, this is typically not used from within core spark.
791792
*/
792-
lazy val localIpAddress: String = findLocalIpAddress()
793-
lazy val localIpAddressHostname: String = getAddressHostName(localIpAddress)
793+
private lazy val localIpAddress: InetAddress = findLocalInetAddress()
794794

795-
private def findLocalIpAddress(): String = {
795+
private def findLocalInetAddress(): InetAddress = {
796796
val defaultIpOverride = System.getenv("SPARK_LOCAL_IP")
797797
if (defaultIpOverride != null) {
798-
defaultIpOverride
798+
InetAddress.getByName(defaultIpOverride)
799799
} else {
800800
val address = InetAddress.getLocalHost
801801
if (address.isLoopbackAddress) {
@@ -806,23 +806,28 @@ private[spark] object Utils extends Logging {
806806
// It's more proper to pick ip address following system output order.
807807
val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.toList
808808
val reOrderedNetworkIFs = if (isWindows) activeNetworkIFs else activeNetworkIFs.reverse
809+
809810
for (ni <- reOrderedNetworkIFs) {
810-
for (addr <- ni.getInetAddresses if !addr.isLinkLocalAddress &&
811-
!addr.isLoopbackAddress && addr.isInstanceOf[Inet4Address]) {
811+
val addresses = ni.getInetAddresses.toList
812+
.filterNot(addr => addr.isLinkLocalAddress || addr.isLoopbackAddress)
813+
if (addresses.nonEmpty) {
814+
val addr = addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head)
815+
// because of Inet6Address.toHostName may add interface at the end if it knows about it
816+
val strippedAddress = InetAddress.getByAddress(addr.getAddress)
812817
// We've found an address that looks reasonable!
813818
logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
814-
" a loopback address: " + address.getHostAddress + "; using " + addr.getHostAddress +
815-
" instead (on interface " + ni.getName + ")")
819+
" a loopback address: " + address.getHostAddress + "; using " +
820+
strippedAddress.getHostAddress + " instead (on interface " + ni.getName + ")")
816821
logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
817-
return addr.getHostAddress
822+
return strippedAddress
818823
}
819824
}
820825
logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
821826
" a loopback address: " + address.getHostAddress + ", but we couldn't find any" +
822827
" external IP address!")
823828
logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
824829
}
825-
address.getHostAddress
830+
address
826831
}
827832
}
828833

@@ -842,11 +847,14 @@ private[spark] object Utils extends Logging {
842847
* Get the local machine's hostname.
843848
*/
844849
def localHostName(): String = {
845-
customHostname.getOrElse(localIpAddressHostname)
850+
customHostname.getOrElse(localIpAddress.getHostAddress)
846851
}
847852

848-
def getAddressHostName(address: String): String = {
849-
InetAddress.getByName(address).getHostName
853+
/**
854+
* Get the local machine's URI.
855+
*/
856+
def localHostNameForURI(): String = {
857+
customHostname.getOrElse(InetAddresses.toUriString(localIpAddress))
850858
}
851859

852860
def checkHost(host: String, message: String = "") {

core/src/test/scala/org/apache/spark/AccumulatorSuite.scala

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,20 @@ import org.scalatest.Matchers
2727
class AccumulatorSuite extends FunSuite with Matchers with LocalSparkContext {
2828

2929

30-
implicit def setAccum[A] = new AccumulableParam[mutable.Set[A], A] {
31-
def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = {
32-
t1 ++= t2
33-
t1
34-
}
35-
def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = {
36-
t1 += t2
37-
t1
38-
}
39-
def zero(t: mutable.Set[A]) : mutable.Set[A] = {
40-
new mutable.HashSet[A]()
30+
implicit def setAccum[A]: AccumulableParam[mutable.Set[A], A] =
31+
new AccumulableParam[mutable.Set[A], A] {
32+
def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = {
33+
t1 ++= t2
34+
t1
35+
}
36+
def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = {
37+
t1 += t2
38+
t1
39+
}
40+
def zero(t: mutable.Set[A]) : mutable.Set[A] = {
41+
new mutable.HashSet[A]()
42+
}
4143
}
42-
}
4344

4445
test ("basic accumulation"){
4546
sc = new SparkContext("local", "test")
@@ -49,11 +50,10 @@ class AccumulatorSuite extends FunSuite with Matchers with LocalSparkContext {
4950
d.foreach{x => acc += x}
5051
acc.value should be (210)
5152

52-
53-
val longAcc = sc.accumulator(0l)
53+
val longAcc = sc.accumulator(0L)
5454
val maxInt = Integer.MAX_VALUE.toLong
5555
d.foreach{x => longAcc += maxInt + x}
56-
longAcc.value should be (210l + maxInt * 20)
56+
longAcc.value should be (210L + maxInt * 20)
5757
}
5858

5959
test ("value not assignable from tasks") {

0 commit comments

Comments
 (0)