@@ -42,7 +42,7 @@ import org.apache.spark.internal.Logging
4242import org .apache .spark .network .util .ByteUnit
4343import org .apache .spark .scheduler .{CompressedMapStatus , HighlyCompressedMapStatus }
4444import org .apache .spark .storage ._
45- import org .apache .spark .util .{BoundedPriorityQueue , SerializableConfiguration , SerializableJobConf , Utils }
45+ import org .apache .spark .util .{BoundedPriorityQueue , ByteBufferInputStream , SerializableConfiguration , SerializableJobConf , Utils }
4646import org .apache .spark .util .collection .CompactBuffer
4747
4848/**
@@ -417,7 +417,12 @@ private[spark] class KryoSerializerInstance(
417417 override def deserialize [T : ClassTag ](bytes : ByteBuffer ): T = {
418418 val kryo = borrowKryo()
419419 try {
420- input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
420+ if (bytes.hasArray) {
421+ input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
422+ } else {
423+ input.setBuffer(new Array [Byte ](4096 ))
424+ input.setInputStream(new ByteBufferInputStream (bytes))
425+ }
421426 kryo.readClassAndObject(input).asInstanceOf [T ]
422427 } finally {
423428 releaseKryo(kryo)
@@ -429,7 +434,12 @@ private[spark] class KryoSerializerInstance(
429434 val oldClassLoader = kryo.getClassLoader
430435 try {
431436 kryo.setClassLoader(loader)
432- input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
437+ if (bytes.hasArray) {
438+ input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
439+ } else {
440+ input.setBuffer(new Array [Byte ](4096 ))
441+ input.setInputStream(new ByteBufferInputStream (bytes))
442+ }
433443 kryo.readClassAndObject(input).asInstanceOf [T ]
434444 } finally {
435445 kryo.setClassLoader(oldClassLoader)
0 commit comments