1717
1818package org .apache .spark
1919
20+ import java .io .{IOException , ObjectInputStream , ObjectOutputStream }
21+
2022import scala .reflect .ClassTag
2123
2224import org .apache .spark .rdd .RDD
23- import org .apache .spark .util . CollectionsUtils
24- import org .apache .spark .util .Utils
25+ import org .apache .spark .serializer . JavaSerializer
26+ import org .apache .spark .util .{ CollectionsUtils , Utils }
2527
2628/**
2729 * An object that defines how the elements in a key-value pair RDD are partitioned by key.
@@ -96,15 +98,15 @@ class HashPartitioner(partitions: Int) extends Partitioner {
9698 * the value of `partitions`.
9799 */
98100class RangePartitioner [K : Ordering : ClassTag , V ](
99- partitions : Int ,
101+ @ transient partitions : Int ,
100102 @ transient rdd : RDD [_ <: Product2 [K ,V ]],
101- private val ascending : Boolean = true )
103+ private var ascending : Boolean = true )
102104 extends Partitioner {
103105
104- private val ordering = implicitly[Ordering [K ]]
106+ private var ordering = implicitly[Ordering [K ]]
105107
106108 // An array of upper bounds for the first (partitions - 1) partitions
107- private val rangeBounds : Array [K ] = {
109+ private var rangeBounds : Array [K ] = {
108110 if (partitions == 1 ) {
109111 Array ()
110112 } else {
@@ -127,7 +129,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
127129
128130 def numPartitions = rangeBounds.length + 1
129131
130- private val binarySearch : ((Array [K ], K ) => Int ) = CollectionsUtils .makeBinarySearch[K ]
132+ private var binarySearch : ((Array [K ], K ) => Int ) = CollectionsUtils .makeBinarySearch[K ]
131133
132134 def getPartition (key : Any ): Int = {
133135 val k = key.asInstanceOf [K ]
@@ -173,4 +175,40 @@ class RangePartitioner[K : Ordering : ClassTag, V](
173175 result = prime * result + ascending.hashCode
174176 result
175177 }
178+
179+ @ throws(classOf [IOException ])
180+ private def writeObject (out : ObjectOutputStream ) {
181+ val sfactory = SparkEnv .get.serializer
182+ sfactory match {
183+ case js : JavaSerializer => out.defaultWriteObject()
184+ case _ =>
185+ out.writeBoolean(ascending)
186+ out.writeObject(ordering)
187+ out.writeObject(binarySearch)
188+
189+ val ser = sfactory.newInstance()
190+ Utils .serializeViaNestedStream(out, ser) { stream =>
191+ stream.writeObject(scala.reflect.classTag[Array [K ]])
192+ stream.writeObject(rangeBounds)
193+ }
194+ }
195+ }
196+
197+ @ throws(classOf [IOException ])
198+ private def readObject (in : ObjectInputStream ) {
199+ val sfactory = SparkEnv .get.serializer
200+ sfactory match {
201+ case js : JavaSerializer => in.defaultReadObject()
202+ case _ =>
203+ ascending = in.readBoolean()
204+ ordering = in.readObject().asInstanceOf [Ordering [K ]]
205+ binarySearch = in.readObject().asInstanceOf [(Array [K ], K ) => Int ]
206+
207+ val ser = sfactory.newInstance()
208+ Utils .deserializeViaNestedStream(in, ser) { ds =>
209+ implicit val classTag = ds.readObject[ClassTag [Array [K ]]]()
210+ rangeBounds = ds.readObject[Array [K ]]()
211+ }
212+ }
213+ }
176214}
0 commit comments