Skip to content

Commit 65b987c

Browse files
zsxwingrxin
authored andcommitted
[SPARK-4397][Core] Reorganize 'implicit's to improve the API convenience
This PR moved `implicit`s to `package object` and `companion object` to enable the Scala compiler search them automatically without explicit importing. It should not break any API. A test project for backforward compatibility is [here](https://github.com/zsxwing/SPARK-4397-Backforward-Compatibility). It proves the codes compiled with Spark 1.1.0 can run with this PR. To summarize, the changes are: * Deprecated the old implicit conversion functions: this preserves binary compatibility for code compiled against earlier versions of Spark. * Removed "implicit" from them so they are just normal functions: this made sure the compiler doesn't get confused and warn about multiple implicits in scope. * Created new implicit functions in package rdd object, which is part of the scope that scalac will search when looking for implicit conversions on various RDD objects. The disadvantage is there are duplicated codes in SparkContext for backforward compatibility. Author: zsxwing <[email protected]> Closes #3262 from zsxwing/SPARK-4397 and squashes the following commits: fc30314 [zsxwing] Update the comments 9c27aff [zsxwing] Move implicit functions to object RDD and forward old functions to new implicit ones directly 2b5f5a4 [zsxwing] Comments for the deprecated functions 52353de [zsxwing] Remove private[spark] from object WritableConverter 34641d4 [zsxwing] Move ImplicitSuite to org.apache.sparktest 7266218 [zsxwing] Add comments to warn the duplicate codes in SparkContext 185c12f [zsxwing] Remove simpleWritableConverter from SparkContext 3bdcae2 [zsxwing] Move WritableConverter implicits to object WritableConverter 9b73188 [zsxwing] Fix the code style issue 3ac4f07 [zsxwing] Add license header 1eda9e4 [zsxwing] Reorganize 'implicit's to improve the API convenience
1 parent f1069b8 commit 65b987c

File tree

7 files changed

+311
-44
lines changed

7 files changed

+311
-44
lines changed

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,36 @@ trait AccumulatorParam[T] extends AccumulableParam[T, T] {
244244
}
245245
}
246246

247+
object AccumulatorParam {
248+
249+
// The following implicit objects were in SparkContext before 1.2 and users had to
250+
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
251+
// them automatically. However, as there are duplicate codes in SparkContext for backward
252+
// compatibility, please update them accordingly if you modify the following implicit objects.
253+
254+
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
255+
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
256+
def zero(initialValue: Double) = 0.0
257+
}
258+
259+
implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
260+
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
261+
def zero(initialValue: Int) = 0
262+
}
263+
264+
implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
265+
def addInPlace(t1: Long, t2: Long) = t1 + t2
266+
def zero(initialValue: Long) = 0L
267+
}
268+
269+
implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
270+
def addInPlace(t1: Float, t2: Float) = t1 + t2
271+
def zero(initialValue: Float) = 0f
272+
}
273+
274+
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
275+
}
276+
247277
// TODO: The multi-thread support in accumulators is kind of lame; check
248278
// if there's a more intuitive way of doing it right
249279
private object Accumulators {

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 120 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1624,47 +1624,74 @@ object SparkContext extends Logging {
16241624

16251625
private[spark] val DRIVER_IDENTIFIER = "<driver>"
16261626

1627-
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
1627+
// The following deprecated objects have already been copied to `object AccumulatorParam` to
1628+
// make the compiler find them automatically. They are duplicate codes only for backward
1629+
// compatibility, please update `object AccumulatorParam` accordingly if you plan to modify the
1630+
// following ones.
1631+
1632+
@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
1633+
"backward compatibility.", "1.2.0")
1634+
object DoubleAccumulatorParam extends AccumulatorParam[Double] {
16281635
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
16291636
def zero(initialValue: Double) = 0.0
16301637
}
16311638

1632-
implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
1639+
@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
1640+
"backward compatibility.", "1.2.0")
1641+
object IntAccumulatorParam extends AccumulatorParam[Int] {
16331642
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
16341643
def zero(initialValue: Int) = 0
16351644
}
16361645

1637-
implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
1646+
@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
1647+
"backward compatibility.", "1.2.0")
1648+
object LongAccumulatorParam extends AccumulatorParam[Long] {
16381649
def addInPlace(t1: Long, t2: Long) = t1 + t2
16391650
def zero(initialValue: Long) = 0L
16401651
}
16411652

1642-
implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
1653+
@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
1654+
"backward compatibility.", "1.2.0")
1655+
object FloatAccumulatorParam extends AccumulatorParam[Float] {
16431656
def addInPlace(t1: Float, t2: Float) = t1 + t2
16441657
def zero(initialValue: Float) = 0f
16451658
}
16461659

1647-
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
1660+
// The following deprecated functions have already been moved to `object RDD` to
1661+
// make the compiler find them automatically. They are still kept here for backward compatibility
1662+
// and just call the corresponding functions in `object RDD`.
16481663

1649-
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
1664+
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
1665+
"kept here only for backward compatibility.", "1.2.0")
1666+
def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
16501667
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
1651-
new PairRDDFunctions(rdd)
1668+
RDD.rddToPairRDDFunctions(rdd)
16521669
}
16531670

1654-
implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)
1671+
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
1672+
"kept here only for backward compatibility.", "1.2.0")
1673+
def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = RDD.rddToAsyncRDDActions(rdd)
16551674

1656-
implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
1675+
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
1676+
"kept here only for backward compatibility.", "1.2.0")
1677+
def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
16571678
rdd: RDD[(K, V)]) =
1658-
new SequenceFileRDDFunctions(rdd)
1679+
RDD.rddToSequenceFileRDDFunctions(rdd)
16591680

1660-
implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
1681+
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
1682+
"kept here only for backward compatibility.", "1.2.0")
1683+
def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
16611684
rdd: RDD[(K, V)]) =
1662-
new OrderedRDDFunctions[K, V, (K, V)](rdd)
1685+
RDD.rddToOrderedRDDFunctions(rdd)
16631686

1664-
implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd)
1687+
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
1688+
"kept here only for backward compatibility.", "1.2.0")
1689+
def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = RDD.doubleRDDToDoubleRDDFunctions(rdd)
16651690

1666-
implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
1667-
new DoubleRDDFunctions(rdd.map(x => num.toDouble(x)))
1691+
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
1692+
"kept here only for backward compatibility.", "1.2.0")
1693+
def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
1694+
RDD.numericRDDToDoubleRDDFunctions(rdd)
16681695

16691696
// Implicit conversions to common Writable types, for saveAsSequenceFile
16701697

@@ -1690,40 +1717,49 @@ object SparkContext extends Logging {
16901717
arr.map(x => anyToWritable(x)).toArray)
16911718
}
16921719

1693-
// Helper objects for converting common types to Writable
1694-
private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T)
1695-
: WritableConverter[T] = {
1696-
val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
1697-
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
1698-
}
1720+
// The following deprecated functions have already been moved to `object WritableConverter` to
1721+
// make the compiler find them automatically. They are still kept here for backward compatibility
1722+
// and just call the corresponding functions in `object WritableConverter`.
16991723

1700-
implicit def intWritableConverter(): WritableConverter[Int] =
1701-
simpleWritableConverter[Int, IntWritable](_.get)
1724+
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
1725+
"backward compatibility.", "1.2.0")
1726+
def intWritableConverter(): WritableConverter[Int] =
1727+
WritableConverter.intWritableConverter()
17021728

1703-
implicit def longWritableConverter(): WritableConverter[Long] =
1704-
simpleWritableConverter[Long, LongWritable](_.get)
1729+
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
1730+
"backward compatibility.", "1.2.0")
1731+
def longWritableConverter(): WritableConverter[Long] =
1732+
WritableConverter.longWritableConverter()
17051733

1706-
implicit def doubleWritableConverter(): WritableConverter[Double] =
1707-
simpleWritableConverter[Double, DoubleWritable](_.get)
1734+
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
1735+
"backward compatibility.", "1.2.0")
1736+
def doubleWritableConverter(): WritableConverter[Double] =
1737+
WritableConverter.doubleWritableConverter()
17081738

1709-
implicit def floatWritableConverter(): WritableConverter[Float] =
1710-
simpleWritableConverter[Float, FloatWritable](_.get)
1739+
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
1740+
"backward compatibility.", "1.2.0")
1741+
def floatWritableConverter(): WritableConverter[Float] =
1742+
WritableConverter.floatWritableConverter()
17111743

1712-
implicit def booleanWritableConverter(): WritableConverter[Boolean] =
1713-
simpleWritableConverter[Boolean, BooleanWritable](_.get)
1744+
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
1745+
"backward compatibility.", "1.2.0")
1746+
def booleanWritableConverter(): WritableConverter[Boolean] =
1747+
WritableConverter.booleanWritableConverter()
17141748

1715-
implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
1716-
simpleWritableConverter[Array[Byte], BytesWritable](bw =>
1717-
// getBytes method returns array which is longer then data to be returned
1718-
Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
1719-
)
1720-
}
1749+
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
1750+
"backward compatibility.", "1.2.0")
1751+
def bytesWritableConverter(): WritableConverter[Array[Byte]] =
1752+
WritableConverter.bytesWritableConverter()
17211753

1722-
implicit def stringWritableConverter(): WritableConverter[String] =
1723-
simpleWritableConverter[String, Text](_.toString)
1754+
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
1755+
"backward compatibility.", "1.2.0")
1756+
def stringWritableConverter(): WritableConverter[String] =
1757+
WritableConverter.stringWritableConverter()
17241758

1725-
implicit def writableWritableConverter[T <: Writable]() =
1726-
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
1759+
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
1760+
"backward compatibility.", "1.2.0")
1761+
def writableWritableConverter[T <: Writable]() =
1762+
WritableConverter.writableWritableConverter()
17271763

17281764
/**
17291765
* Find the JAR from which a given class was loaded, to make it easy for users to pass
@@ -1950,3 +1986,46 @@ private[spark] class WritableConverter[T](
19501986
val writableClass: ClassTag[T] => Class[_ <: Writable],
19511987
val convert: Writable => T)
19521988
extends Serializable
1989+
1990+
object WritableConverter {
1991+
1992+
// Helper objects for converting common types to Writable
1993+
private[spark] def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T)
1994+
: WritableConverter[T] = {
1995+
val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
1996+
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
1997+
}
1998+
1999+
// The following implicit functions were in SparkContext before 1.2 and users had to
2000+
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
2001+
// them automatically. However, we still keep the old functions in SparkContext for backward
2002+
// compatibility and forward to the following functions directly.
2003+
2004+
implicit def intWritableConverter(): WritableConverter[Int] =
2005+
simpleWritableConverter[Int, IntWritable](_.get)
2006+
2007+
implicit def longWritableConverter(): WritableConverter[Long] =
2008+
simpleWritableConverter[Long, LongWritable](_.get)
2009+
2010+
implicit def doubleWritableConverter(): WritableConverter[Double] =
2011+
simpleWritableConverter[Double, DoubleWritable](_.get)
2012+
2013+
implicit def floatWritableConverter(): WritableConverter[Float] =
2014+
simpleWritableConverter[Float, FloatWritable](_.get)
2015+
2016+
implicit def booleanWritableConverter(): WritableConverter[Boolean] =
2017+
simpleWritableConverter[Boolean, BooleanWritable](_.get)
2018+
2019+
implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
2020+
simpleWritableConverter[Array[Byte], BytesWritable](bw =>
2021+
// getBytes method returns array which is longer then data to be returned
2022+
Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
2023+
)
2024+
}
2025+
2026+
implicit def stringWritableConverter(): WritableConverter[String] =
2027+
simpleWritableConverter[String, Text](_.toString)
2028+
2029+
implicit def writableWritableConverter[T <: Writable]() =
2030+
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
2031+
}

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
3232

3333
import org.apache.spark.{HashPartitioner, Partitioner}
3434
import org.apache.spark.Partitioner._
35-
import org.apache.spark.SparkContext.rddToPairRDDFunctions
3635
import org.apache.spark.annotation.Experimental
3736
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
3837
import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap
3938
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction}
4039
import org.apache.spark.partial.{BoundedDouble, PartialResult}
4140
import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
41+
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
4242
import org.apache.spark.storage.StorageLevel
4343
import org.apache.spark.util.Utils
4444

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.hadoop.mapred.{InputFormat, JobConf}
3333
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
3434

3535
import org.apache.spark._
36-
import org.apache.spark.SparkContext._
36+
import org.apache.spark.AccumulatorParam._
3737
import org.apache.spark.annotation.Experimental
3838
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
3939
import org.apache.spark.broadcast.Broadcast

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@ import java.util.{Properties, Random}
2121

2222
import scala.collection.{mutable, Map}
2323
import scala.collection.mutable.ArrayBuffer
24+
import scala.language.implicitConversions
2425
import scala.reflect.{classTag, ClassTag}
2526

2627
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
2728
import org.apache.hadoop.io.BytesWritable
2829
import org.apache.hadoop.io.compress.CompressionCodec
2930
import org.apache.hadoop.io.NullWritable
3031
import org.apache.hadoop.io.Text
32+
import org.apache.hadoop.io.Writable
3133
import org.apache.hadoop.mapred.TextOutputFormat
3234

3335
import org.apache.spark._
@@ -1383,3 +1385,31 @@ abstract class RDD[T: ClassTag](
13831385
new JavaRDD(this)(elementClassTag)
13841386
}
13851387
}
1388+
1389+
object RDD {
1390+
1391+
// The following implicit functions were in SparkContext before 1.2 and users had to
1392+
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
1393+
// them automatically. However, we still keep the old functions in SparkContext for backward
1394+
// compatibility and forward to the following functions directly.
1395+
1396+
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
1397+
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
1398+
new PairRDDFunctions(rdd)
1399+
}
1400+
1401+
implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)
1402+
1403+
implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
1404+
rdd: RDD[(K, V)]) =
1405+
new SequenceFileRDDFunctions(rdd)
1406+
1407+
implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
1408+
rdd: RDD[(K, V)]) =
1409+
new OrderedRDDFunctions[K, V, (K, V)](rdd)
1410+
1411+
implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd)
1412+
1413+
implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
1414+
new DoubleRDDFunctions(rdd.map(x => num.toDouble(x)))
1415+
}

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
8282
bytesWritable.set(inputArray, 0, 10)
8383
bytesWritable.set(inputArray, 0, 5)
8484

85-
val converter = SparkContext.bytesWritableConverter()
85+
val converter = WritableConverter.bytesWritableConverter()
8686
val byteArray = converter.convert(bytesWritable)
8787
assert(byteArray.length === 5)
8888

0 commit comments

Comments
 (0)