Skip to content
Closed
30 changes: 30 additions & 0 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,36 @@ trait AccumulatorParam[T] extends AccumulableParam[T, T] {
}
}

object AccumulatorParam {

// The following implicit objects were in SparkContext before 1.2 and users had to
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
// them automatically. However, as there are duplicate codes in SparkContext for backward
// compatibility, please update them accordingly if you modify the following implicit objects.

implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double) = 0.0
}

implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int) = 0
}

implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
def addInPlace(t1: Long, t2: Long) = t1 + t2
def zero(initialValue: Long) = 0L
}

implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
def addInPlace(t1: Float, t2: Float) = t1 + t2
def zero(initialValue: Float) = 0f
}

// TODO: Add AccumulatorParams for other types, e.g. lists and strings
}

// TODO: The multi-thread support in accumulators is kind of lame; check
// if there's a more intuitive way of doing it right
private object Accumulators {
Expand Down
161 changes: 120 additions & 41 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1427,47 +1427,74 @@ object SparkContext extends Logging {

private[spark] val DRIVER_IDENTIFIER = "<driver>"

implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
// The following deprecated objects have already been copied to `object AccumulatorParam` to
// make the compiler find them automatically. They are duplicate codes only for backward
// compatibility, please update `object AccumulatorParam` accordingly if you plan to modify the
// following ones.

@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
"backward compatibility.", "1.2.0")
object DoubleAccumulatorParam extends AccumulatorParam[Double] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this provide binary compatibility for Spark programs compiled against earlier versions of Spark?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this provide binary compatibility for Spark programs compiled against earlier versions of Spark?

Yes. I mentioned it in the description of this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind updating the deprecation message to say

"Replaced by implicit objects in AccumulatorParam. This is kept here only for backward binary compatibility."

Do it for all the following.

def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double) = 0.0
}

implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
"backward compatibility.", "1.2.0")
object IntAccumulatorParam extends AccumulatorParam[Int] {
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int) = 0
}

implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
"backward compatibility.", "1.2.0")
object LongAccumulatorParam extends AccumulatorParam[Long] {
def addInPlace(t1: Long, t2: Long) = t1 + t2
def zero(initialValue: Long) = 0L
}

implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
"backward compatibility.", "1.2.0")
object FloatAccumulatorParam extends AccumulatorParam[Float] {
def addInPlace(t1: Float, t2: Float) = t1 + t2
def zero(initialValue: Float) = 0f
}

// TODO: Add AccumulatorParams for other types, e.g. lists and strings
// The following deprecated functions have already been moved to `object RDD` to
// make the compiler find them automatically. They are still kept here for backward compatibility
// and just call the corresponding functions in `object RDD`.

implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.2.0")
def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
new PairRDDFunctions(rdd)
RDD.rddToPairRDDFunctions(rdd)
}

implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.2.0")
def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = RDD.rddToAsyncRDDActions(rdd)

implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.2.0")
def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)
RDD.rddToSequenceFileRDDFunctions(rdd)

implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.2.0")
def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
rdd: RDD[(K, V)]) =
new OrderedRDDFunctions[K, V, (K, V)](rdd)
RDD.rddToOrderedRDDFunctions(rdd)

implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd)
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.2.0")
def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = RDD.doubleRDDToDoubleRDDFunctions(rdd)

implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
new DoubleRDDFunctions(rdd.map(x => num.toDouble(x)))
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.2.0")
def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
RDD.numericRDDToDoubleRDDFunctions(rdd)

// Implicit conversions to common Writable types, for saveAsSequenceFile

Expand All @@ -1493,40 +1520,49 @@ object SparkContext extends Logging {
arr.map(x => anyToWritable(x)).toArray)
}

// Helper objects for converting common types to Writable
private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T)
: WritableConverter[T] = {
val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
}
// The following deprecated functions have already been moved to `object WritableConverter` to
// make the compiler find them automatically. They are still kept here for backward compatibility
// and just call the corresponding functions in `object WritableConverter`.

implicit def intWritableConverter(): WritableConverter[Int] =
simpleWritableConverter[Int, IntWritable](_.get)
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.2.0")
def intWritableConverter(): WritableConverter[Int] =
WritableConverter.intWritableConverter()

implicit def longWritableConverter(): WritableConverter[Long] =
simpleWritableConverter[Long, LongWritable](_.get)
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.2.0")
def longWritableConverter(): WritableConverter[Long] =
WritableConverter.longWritableConverter()

implicit def doubleWritableConverter(): WritableConverter[Double] =
simpleWritableConverter[Double, DoubleWritable](_.get)
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.2.0")
def doubleWritableConverter(): WritableConverter[Double] =
WritableConverter.doubleWritableConverter()

implicit def floatWritableConverter(): WritableConverter[Float] =
simpleWritableConverter[Float, FloatWritable](_.get)
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.2.0")
def floatWritableConverter(): WritableConverter[Float] =
WritableConverter.floatWritableConverter()

implicit def booleanWritableConverter(): WritableConverter[Boolean] =
simpleWritableConverter[Boolean, BooleanWritable](_.get)
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.2.0")
def booleanWritableConverter(): WritableConverter[Boolean] =
WritableConverter.booleanWritableConverter()

implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
simpleWritableConverter[Array[Byte], BytesWritable](bw =>
// getBytes method returns array which is longer then data to be returned
Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
)
}
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.2.0")
def bytesWritableConverter(): WritableConverter[Array[Byte]] =
WritableConverter.bytesWritableConverter()

implicit def stringWritableConverter(): WritableConverter[String] =
simpleWritableConverter[String, Text](_.toString)
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.2.0")
def stringWritableConverter(): WritableConverter[String] =
WritableConverter.stringWritableConverter()

implicit def writableWritableConverter[T <: Writable]() =
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.2.0")
def writableWritableConverter[T <: Writable]() =
WritableConverter.writableWritableConverter()

/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
Expand Down Expand Up @@ -1750,3 +1786,46 @@ private[spark] class WritableConverter[T](
val writableClass: ClassTag[T] => Class[_ <: Writable],
val convert: Writable => T)
extends Serializable

object WritableConverter {

// Helper objects for converting common types to Writable
private[spark] def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T)
: WritableConverter[T] = {
val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
}

// The following implicit functions were in SparkContext before 1.2 and users had to
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
// them automatically. However, we still keep the old functions in SparkContext for backward
// compatibility and forward to the following functions directly.

implicit def intWritableConverter(): WritableConverter[Int] =
simpleWritableConverter[Int, IntWritable](_.get)

implicit def longWritableConverter(): WritableConverter[Long] =
simpleWritableConverter[Long, LongWritable](_.get)

implicit def doubleWritableConverter(): WritableConverter[Double] =
simpleWritableConverter[Double, DoubleWritable](_.get)

implicit def floatWritableConverter(): WritableConverter[Float] =
simpleWritableConverter[Float, FloatWritable](_.get)

implicit def booleanWritableConverter(): WritableConverter[Boolean] =
simpleWritableConverter[Boolean, BooleanWritable](_.get)

implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
simpleWritableConverter[Array[Byte], BytesWritable](bw =>
// getBytes method returns array which is longer then data to be returned
Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
)
}

implicit def stringWritableConverter(): WritableConverter[String] =
simpleWritableConverter[String, Text](_.toString)

implicit def writableWritableConverter[T <: Writable]() =
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}

import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.Partitioner._
import org.apache.spark.SparkContext.rddToPairRDDFunctions
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction}
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.hadoop.mapred.{InputFormat, JobConf}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.AccumulatorParam._
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.broadcast.Broadcast
Expand Down
30 changes: 30 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import java.util.{Properties, Random}

import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions
import scala.reflect.{classTag, ClassTag}

import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.TextOutputFormat

import org.apache.spark._
Expand Down Expand Up @@ -1383,3 +1385,31 @@ abstract class RDD[T: ClassTag](
new JavaRDD(this)(elementClassTag)
}
}

object RDD {

// The following implicit functions were in SparkContext before 1.2 and users had to
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
// them automatically. However, we still keep the old functions in SparkContext for backward
// compatibility and forward to the following functions directly.

implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
new PairRDDFunctions(rdd)
}

implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)

implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)

implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
rdd: RDD[(K, V)]) =
new OrderedRDDFunctions[K, V, (K, V)](rdd)

implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd)

implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
new DoubleRDDFunctions(rdd.map(x => num.toDouble(x)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class SparkContextSuite extends FunSuite {
bytesWritable.set(inputArray, 0, 10)
bytesWritable.set(inputArray, 0, 5)

val converter = SparkContext.bytesWritableConverter()
val converter = WritableConverter.bytesWritableConverter()
val byteArray = converter.convert(bytesWritable)
assert(byteArray.length === 5)

Expand Down
Loading