Skip to content

Commit d50c388

Browse files
committed
Merge remote-tracking branch 'apache/master' into config-cleanup
Conflicts: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
2 parents a762901 + 7863ecc commit d50c388

File tree

173 files changed

+4517
-736
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

173 files changed

+4517
-736
lines changed

.rat-excludes

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,5 @@ work
4040
golden
4141
test.out/*
4242
.*iml
43-
python/metastore/service.properties
44-
python/metastore/db.lck
43+
service.properties
44+
db.lck

bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import org.scalatest.time.SpanSugar._
2424
import org.apache.spark._
2525
import org.apache.spark.storage.StorageLevel
2626

27+
import scala.language.postfixOps
28+
2729
class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
2830
class TestMessage(val targetId: String) extends Message[String] with Serializable
2931

core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,11 @@
266266
<artifactId>junit-interface</artifactId>
267267
<scope>test</scope>
268268
</dependency>
269+
<dependency>
270+
<groupId>org.spark-project</groupId>
271+
<artifactId>pyrolite</artifactId>
272+
<version>2.0</version>
273+
</dependency>
269274
</dependencies>
270275
<build>
271276
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import java.util.UUID.randomUUID
2525
import scala.collection.{Map, Set}
2626
import scala.collection.generic.Growable
2727
import scala.collection.mutable.{ArrayBuffer, HashMap}
28+
import scala.language.implicitConversions
2829
import scala.reflect.{ClassTag, classTag}
2930
import org.apache.hadoop.conf.Configuration
3031
import org.apache.hadoop.fs.Path
@@ -453,14 +454,21 @@ class SparkContext(config: SparkConf) extends Logging {
453454
* (a-hdfs-path/part-nnnnn, its content)
454455
* }}}
455456
*
456-
* @note Small files are preferred, as each file will be loaded fully in memory.
457+
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
458+
*
459+
* @param minSplits A suggestion value of the minimal splitting number for input data.
457460
*/
458-
def wholeTextFiles(path: String): RDD[(String, String)] = {
459-
newAPIHadoopFile(
460-
path,
461+
def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits): RDD[(String, String)] = {
462+
val job = new NewHadoopJob(hadoopConfiguration)
463+
NewFileInputFormat.addInputPath(job, new Path(path))
464+
val updateConf = job.getConfiguration
465+
new WholeTextFileRDD(
466+
this,
461467
classOf[WholeTextFileInputFormat],
462468
classOf[String],
463-
classOf[String])
469+
classOf[String],
470+
updateConf,
471+
minSplits)
464472
}
465473

466474
/**

core/src/main/scala/org/apache/spark/TestUtils.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,14 @@ private[spark] object TestUtils {
100100

101101
val fileName = className + ".class"
102102
val result = new File(fileName)
103-
if (!result.exists()) throw new Exception("Compiled file not found: " + fileName)
103+
assert(result.exists(), "Compiled file not found: " + result.getAbsolutePath())
104104
val out = new File(destDir, fileName)
105-
result.renameTo(out)
105+
106+
// renameTo cannot handle in and out files in different filesystems
107+
// use google's Files.move instead
108+
Files.move(result, out)
109+
110+
assert(out.exists(), "Destination file not moved: " + out.getAbsolutePath())
106111
out
107112
}
108113
}

core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.api.java
1919

2020
import java.lang.{Double => JDouble}
2121

22+
import scala.language.implicitConversions
2223
import scala.reflect.ClassTag
2324

2425
import org.apache.spark.Partitioner

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.util.{Comparator, List => JList}
2121
import java.lang.{Iterable => JIterable}
2222

2323
import scala.collection.JavaConversions._
24+
import scala.language.implicitConversions
2425
import scala.reflect.ClassTag
2526

2627
import com.google.common.base.Optional

core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.api.java
1919

20+
import scala.language.implicitConversions
2021
import scala.reflect.ClassTag
2122

2223
import org.apache.spark._

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.{Map => JMap}
2222

2323
import scala.collection.JavaConversions
2424
import scala.collection.JavaConversions._
25+
import scala.language.implicitConversions
2526
import scala.reflect.ClassTag
2627

2728
import com.google.common.base.Optional
@@ -177,7 +178,19 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
177178
* (a-hdfs-path/part-nnnnn, its content)
178179
* }}}
179180
*
180-
* @note Small files are preferred, as each file will be loaded fully in memory.
181+
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
182+
*
183+
* @param minSplits A suggestion value of the minimal splitting number for input data.
184+
*/
185+
def wholeTextFiles(path: String, minSplits: Int): JavaPairRDD[String, String] =
186+
new JavaPairRDD(sc.wholeTextFiles(path, minSplits))
187+
188+
/**
189+
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
190+
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
191+
* key-value pair, where the key is the path of each file, the value is the content of each file.
192+
*
193+
* @see `wholeTextFiles(path: String, minSplits: Int)`.
181194
*/
182195
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
183196
new JavaPairRDD(sc.wholeTextFiles(path))

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio
2525
import scala.collection.JavaConversions._
2626
import scala.reflect.ClassTag
2727

28+
import net.razorvine.pickle.{Pickler, Unpickler}
29+
2830
import org.apache.spark._
2931
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
3032
import org.apache.spark.broadcast.Broadcast
@@ -284,6 +286,36 @@ private[spark] object PythonRDD {
284286
file.close()
285287
}
286288

289+
/**
290+
* Convert an RDD of serialized Python dictionaries to Scala Maps
291+
* TODO: Support more Python types.
292+
*/
293+
def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = {
294+
pyRDD.rdd.mapPartitions { iter =>
295+
val unpickle = new Unpickler
296+
// TODO: Figure out why flatMap is necessay for pyspark
297+
iter.flatMap { row =>
298+
unpickle.loads(row) match {
299+
case objs: java.util.ArrayList[JMap[String, _] @unchecked] => objs.map(_.toMap)
300+
// Incase the partition doesn't have a collection
301+
case obj: JMap[String @unchecked, _] => Seq(obj.toMap)
302+
}
303+
}
304+
}
305+
}
306+
307+
/**
308+
* Convert and RDD of Java objects to and RDD of serialized Python objects, that is usable by
309+
* PySpark.
310+
*/
311+
def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
312+
jRDD.rdd.mapPartitions { iter =>
313+
val pickle = new Pickler
314+
iter.map { row =>
315+
pickle.dumps(row)
316+
}
317+
}
318+
}
287319
}
288320

289321
private

0 commit comments

Comments
 (0)