Skip to content

Commit 27ecfe6

Browse files
Davies Liudavies
authored andcommitted
[SPARK-10938] [SQL] remove typeId in columnar cache
This PR remove the typeId in columnar cache, it's not needed anymore, it also remove DATE and TIMESTAMP (use INT/LONG instead). Author: Davies Liu <[email protected]> Closes #8989 from davies/refactor_cache.
1 parent 4e0027f commit 27ecfe6

File tree

13 files changed

+63
-151
lines changed

13 files changed

+63
-151
lines changed

project/MimaExcludes.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ object MimaExcludes {
4242
excludePackage("org.spark-project.jetty"),
4343
MimaBuild.excludeSparkPackage("unused"),
4444
// SQL execution is considered private.
45-
excludePackage("org.apache.spark.sql.execution")
45+
excludePackage("org.apache.spark.sql.execution"),
46+
// SQL columnar is considered private.
47+
excludePackage("org.apache.spark.sql.columnar")
4648
) ++
4749
MimaBuild.excludeSparkClass("streaming.flume.FlumeTestUtils") ++
4850
MimaBuild.excludeSparkClass("streaming.flume.PollingFlumeTestUtils") ++

sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -103,35 +103,23 @@ private[sql] class GenericColumnAccessor(buffer: ByteBuffer, dataType: DataType)
103103
extends BasicColumnAccessor[Array[Byte]](buffer, GENERIC(dataType))
104104
with NullableColumnAccessor
105105

106-
private[sql] class DateColumnAccessor(buffer: ByteBuffer)
107-
extends NativeColumnAccessor(buffer, DATE)
108-
109-
private[sql] class TimestampColumnAccessor(buffer: ByteBuffer)
110-
extends NativeColumnAccessor(buffer, TIMESTAMP)
111-
112106
private[sql] object ColumnAccessor {
113107
def apply(dataType: DataType, buffer: ByteBuffer): ColumnAccessor = {
114-
val dup = buffer.duplicate().order(ByteOrder.nativeOrder)
115-
116-
// The first 4 bytes in the buffer indicate the column type. This field is not used now,
117-
// because we always know the data type of the column ahead of time.
118-
dup.getInt()
108+
val buf = buffer.order(ByteOrder.nativeOrder)
119109

120110
dataType match {
121-
case BooleanType => new BooleanColumnAccessor(dup)
122-
case ByteType => new ByteColumnAccessor(dup)
123-
case ShortType => new ShortColumnAccessor(dup)
124-
case IntegerType => new IntColumnAccessor(dup)
125-
case DateType => new DateColumnAccessor(dup)
126-
case LongType => new LongColumnAccessor(dup)
127-
case TimestampType => new TimestampColumnAccessor(dup)
128-
case FloatType => new FloatColumnAccessor(dup)
129-
case DoubleType => new DoubleColumnAccessor(dup)
130-
case StringType => new StringColumnAccessor(dup)
131-
case BinaryType => new BinaryColumnAccessor(dup)
111+
case BooleanType => new BooleanColumnAccessor(buf)
112+
case ByteType => new ByteColumnAccessor(buf)
113+
case ShortType => new ShortColumnAccessor(buf)
114+
case IntegerType | DateType => new IntColumnAccessor(buf)
115+
case LongType | TimestampType => new LongColumnAccessor(buf)
116+
case FloatType => new FloatColumnAccessor(buf)
117+
case DoubleType => new DoubleColumnAccessor(buf)
118+
case StringType => new StringColumnAccessor(buf)
119+
case BinaryType => new BinaryColumnAccessor(buf)
132120
case DecimalType.Fixed(precision, scale) if precision < 19 =>
133-
new FixedDecimalColumnAccessor(dup, precision, scale)
134-
case other => new GenericColumnAccessor(dup, other)
121+
new FixedDecimalColumnAccessor(buf, precision, scale)
122+
case other => new GenericColumnAccessor(buf, other)
135123
}
136124
}
137125
}

sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,8 @@ private[sql] class BasicColumnBuilder[JvmType](
6363
val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize
6464
this.columnName = columnName
6565

66-
// Reserves 4 bytes for column type ID
67-
buffer = ByteBuffer.allocate(4 + size * columnType.defaultSize)
68-
buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId)
66+
buffer = ByteBuffer.allocate(size * columnType.defaultSize)
67+
buffer.order(ByteOrder.nativeOrder())
6968
}
7069

7170
override def appendFrom(row: InternalRow, ordinal: Int): Unit = {
@@ -121,11 +120,6 @@ private[sql] class FixedDecimalColumnBuilder(
121120
private[sql] class GenericColumnBuilder(dataType: DataType)
122121
extends ComplexColumnBuilder(new GenericColumnStats(dataType), GENERIC(dataType))
123122

124-
private[sql] class DateColumnBuilder extends NativeColumnBuilder(new DateColumnStats, DATE)
125-
126-
private[sql] class TimestampColumnBuilder
127-
extends NativeColumnBuilder(new TimestampColumnStats, TIMESTAMP)
128-
129123
private[sql] object ColumnBuilder {
130124
val DEFAULT_INITIAL_BUFFER_SIZE = 1024 * 1024
131125

@@ -154,10 +148,8 @@ private[sql] object ColumnBuilder {
154148
case BooleanType => new BooleanColumnBuilder
155149
case ByteType => new ByteColumnBuilder
156150
case ShortType => new ShortColumnBuilder
157-
case IntegerType => new IntColumnBuilder
158-
case DateType => new DateColumnBuilder
159-
case LongType => new LongColumnBuilder
160-
case TimestampType => new TimestampColumnBuilder
151+
case IntegerType | DateType => new IntColumnBuilder
152+
case LongType | TimestampType => new LongColumnBuilder
161153
case FloatType => new FloatColumnBuilder
162154
case DoubleType => new DoubleColumnBuilder
163155
case StringType => new StringColumnBuilder

sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,3 @@ private[sql] class GenericColumnStats(dataType: DataType) extends ColumnStats {
266266
override def collectedStatistics: GenericInternalRow =
267267
new GenericInternalRow(Array[Any](null, null, nullCount, count, sizeInBytes))
268268
}
269-
270-
private[sql] class DateColumnStats extends IntColumnStats
271-
272-
private[sql] class TimestampColumnStats extends LongColumnStats

sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala

Lines changed: 13 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,6 @@ private[sql] sealed abstract class ColumnType[JvmType] {
3838
// The catalyst data type of this column.
3939
def dataType: DataType
4040

41-
// A unique ID representing the type.
42-
def typeId: Int
43-
4441
// Default size in bytes for one element of type T (e.g. 4 for `Int`).
4542
def defaultSize: Int
4643

@@ -107,7 +104,6 @@ private[sql] sealed abstract class ColumnType[JvmType] {
107104

108105
private[sql] abstract class NativeColumnType[T <: AtomicType](
109106
val dataType: T,
110-
val typeId: Int,
111107
val defaultSize: Int)
112108
extends ColumnType[T#InternalType] {
113109

@@ -117,7 +113,7 @@ private[sql] abstract class NativeColumnType[T <: AtomicType](
117113
def scalaTag: TypeTag[dataType.InternalType] = dataType.tag
118114
}
119115

120-
private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) {
116+
private[sql] object INT extends NativeColumnType(IntegerType, 4) {
121117
override def append(v: Int, buffer: ByteBuffer): Unit = {
122118
buffer.putInt(v)
123119
}
@@ -145,7 +141,7 @@ private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) {
145141
}
146142
}
147143

148-
private[sql] object LONG extends NativeColumnType(LongType, 1, 8) {
144+
private[sql] object LONG extends NativeColumnType(LongType, 8) {
149145
override def append(v: Long, buffer: ByteBuffer): Unit = {
150146
buffer.putLong(v)
151147
}
@@ -173,7 +169,7 @@ private[sql] object LONG extends NativeColumnType(LongType, 1, 8) {
173169
}
174170
}
175171

176-
private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) {
172+
private[sql] object FLOAT extends NativeColumnType(FloatType, 4) {
177173
override def append(v: Float, buffer: ByteBuffer): Unit = {
178174
buffer.putFloat(v)
179175
}
@@ -201,7 +197,7 @@ private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) {
201197
}
202198
}
203199

204-
private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) {
200+
private[sql] object DOUBLE extends NativeColumnType(DoubleType, 8) {
205201
override def append(v: Double, buffer: ByteBuffer): Unit = {
206202
buffer.putDouble(v)
207203
}
@@ -229,7 +225,7 @@ private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) {
229225
}
230226
}
231227

232-
private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) {
228+
private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 1) {
233229
override def append(v: Boolean, buffer: ByteBuffer): Unit = {
234230
buffer.put(if (v) 1: Byte else 0: Byte)
235231
}
@@ -255,7 +251,7 @@ private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) {
255251
}
256252
}
257253

258-
private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) {
254+
private[sql] object BYTE extends NativeColumnType(ByteType, 1) {
259255
override def append(v: Byte, buffer: ByteBuffer): Unit = {
260256
buffer.put(v)
261257
}
@@ -283,7 +279,7 @@ private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) {
283279
}
284280
}
285281

286-
private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) {
282+
private[sql] object SHORT extends NativeColumnType(ShortType, 2) {
287283
override def append(v: Short, buffer: ByteBuffer): Unit = {
288284
buffer.putShort(v)
289285
}
@@ -311,7 +307,7 @@ private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) {
311307
}
312308
}
313309

314-
private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
310+
private[sql] object STRING extends NativeColumnType(StringType, 8) {
315311
override def actualSize(row: InternalRow, ordinal: Int): Int = {
316312
row.getUTF8String(ordinal).numBytes() + 4
317313
}
@@ -343,46 +339,9 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
343339
override def clone(v: UTF8String): UTF8String = v.clone()
344340
}
345341

346-
private[sql] object DATE extends NativeColumnType(DateType, 8, 4) {
347-
override def extract(buffer: ByteBuffer): Int = {
348-
buffer.getInt
349-
}
350-
351-
override def append(v: Int, buffer: ByteBuffer): Unit = {
352-
buffer.putInt(v)
353-
}
354-
355-
override def getField(row: InternalRow, ordinal: Int): Int = {
356-
row.getInt(ordinal)
357-
}
358-
359-
def setField(row: MutableRow, ordinal: Int, value: Int): Unit = {
360-
row(ordinal) = value
361-
}
362-
}
363-
364-
private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 9, 8) {
365-
override def extract(buffer: ByteBuffer): Long = {
366-
buffer.getLong
367-
}
368-
369-
override def append(v: Long, buffer: ByteBuffer): Unit = {
370-
buffer.putLong(v)
371-
}
372-
373-
override def getField(row: InternalRow, ordinal: Int): Long = {
374-
row.getLong(ordinal)
375-
}
376-
377-
override def setField(row: MutableRow, ordinal: Int, value: Long): Unit = {
378-
row(ordinal) = value
379-
}
380-
}
381-
382342
private[sql] case class FIXED_DECIMAL(precision: Int, scale: Int)
383343
extends NativeColumnType(
384344
DecimalType(precision, scale),
385-
10,
386345
FIXED_DECIMAL.defaultSize) {
387346

388347
override def extract(buffer: ByteBuffer): Decimal = {
@@ -410,9 +369,7 @@ private[sql] object FIXED_DECIMAL {
410369
val defaultSize = 8
411370
}
412371

413-
private[sql] sealed abstract class ByteArrayColumnType(
414-
val typeId: Int,
415-
val defaultSize: Int)
372+
private[sql] sealed abstract class ByteArrayColumnType(val defaultSize: Int)
416373
extends ColumnType[Array[Byte]] {
417374

418375
override def actualSize(row: InternalRow, ordinal: Int): Int = {
@@ -431,7 +388,7 @@ private[sql] sealed abstract class ByteArrayColumnType(
431388
}
432389
}
433390

434-
private[sql] object BINARY extends ByteArrayColumnType(11, 16) {
391+
private[sql] object BINARY extends ByteArrayColumnType(16) {
435392

436393
def dataType: DataType = BooleanType
437394

@@ -447,7 +404,7 @@ private[sql] object BINARY extends ByteArrayColumnType(11, 16) {
447404
// Used to process generic objects (all types other than those listed above). Objects should be
448405
// serialized first before appending to the column `ByteBuffer`, and is also extracted as serialized
449406
// byte array.
450-
private[sql] case class GENERIC(dataType: DataType) extends ByteArrayColumnType(12, 16) {
407+
private[sql] case class GENERIC(dataType: DataType) extends ByteArrayColumnType(16) {
451408
override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]): Unit = {
452409
row.update(ordinal, SparkSqlSerializer.deserialize[Any](value))
453410
}
@@ -463,10 +420,8 @@ private[sql] object ColumnType {
463420
case BooleanType => BOOLEAN
464421
case ByteType => BYTE
465422
case ShortType => SHORT
466-
case IntegerType => INT
467-
case DateType => DATE
468-
case LongType => LONG
469-
case TimestampType => TIMESTAMP
423+
case IntegerType | DateType => INT
424+
case LongType | TimestampType => LONG
470425
case FloatType => FLOAT
471426
case DoubleType => DOUBLE
472427
case StringType => STRING

sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,13 @@ import org.apache.spark.sql.catalyst.InternalRow
2525
* A stackable trait used for building byte buffer for a column containing null values. Memory
2626
* layout of the final byte buffer is:
2727
* {{{
28-
* .----------------------- Column type ID (4 bytes)
29-
* | .------------------- Null count N (4 bytes)
30-
* | | .--------------- Null positions (4 x N bytes, empty if null count is zero)
31-
* | | | .--------- Non-null elements
32-
* V V V V
33-
* +---+---+-----+---------+
34-
* | | | ... | ... ... |
35-
* +---+---+-----+---------+
28+
* .------------------- Null count N (4 bytes)
29+
* | .--------------- Null positions (4 x N bytes, empty if null count is zero)
30+
* | | .--------- Non-null elements
31+
* V V V
32+
* +---+-----+---------+
33+
* | | ... | ... ... |
34+
* +---+-----+---------+
3635
* }}}
3736
*/
3837
private[sql] trait NullableColumnBuilder extends ColumnBuilder {
@@ -66,16 +65,14 @@ private[sql] trait NullableColumnBuilder extends ColumnBuilder {
6665

6766
abstract override def build(): ByteBuffer = {
6867
val nonNulls = super.build()
69-
val typeId = nonNulls.getInt()
7068
val nullDataLen = nulls.position()
7169

7270
nulls.limit(nullDataLen)
7371
nulls.rewind()
7472

7573
val buffer = ByteBuffer
76-
.allocate(4 + 4 + nullDataLen + nonNulls.remaining())
74+
.allocate(4 + nullDataLen + nonNulls.remaining())
7775
.order(ByteOrder.nativeOrder())
78-
.putInt(typeId)
7976
.putInt(nullCount)
8077
.put(nulls)
8178
.put(nonNulls)

sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,16 @@ import org.apache.spark.sql.types.AtomicType
2828
* A stackable trait that builds optionally compressed byte buffer for a column. Memory layout of
2929
* the final byte buffer is:
3030
* {{{
31-
* .--------------------------- Column type ID (4 bytes)
32-
* | .----------------------- Null count N (4 bytes)
33-
* | | .------------------- Null positions (4 x N bytes, empty if null count is zero)
34-
* | | | .------------- Compression scheme ID (4 bytes)
35-
* | | | | .--------- Compressed non-null elements
36-
* V V V V V
37-
* +---+---+-----+---+---------+
38-
* | | | ... | | ... ... |
39-
* +---+---+-----+---+---------+
40-
* \-----------/ \-----------/
41-
* header body
31+
* .----------------------- Null count N (4 bytes)
32+
* | .------------------- Null positions (4 x N bytes, empty if null count is zero)
33+
* | | .------------- Compression scheme ID (4 bytes)
34+
* | | | .--------- Compressed non-null elements
35+
* V V V V
36+
* +---+-----+---+---------+
37+
* | | ... | | ... ... |
38+
* +---+-----+---+---------+
39+
* \-------/ \-----------/
40+
* header body
4241
* }}}
4342
*/
4443
private[sql] trait CompressibleColumnBuilder[T <: AtomicType]
@@ -83,14 +82,13 @@ private[sql] trait CompressibleColumnBuilder[T <: AtomicType]
8382

8483
override def build(): ByteBuffer = {
8584
val nonNullBuffer = buildNonNulls()
86-
val typeId = nonNullBuffer.getInt()
8785
val encoder: Encoder[T] = {
8886
val candidate = compressionEncoders.minBy(_.compressionRatio)
8987
if (isWorthCompressing(candidate)) candidate else PassThrough.encoder(columnType)
9088
}
9189

92-
// Header = column type ID + null count + null positions
93-
val headerSize = 4 + 4 + nulls.limit()
90+
// Header = null count + null positions
91+
val headerSize = 4 + nulls.limit()
9492
val compressedSize = if (encoder.compressedSize == 0) {
9593
nonNullBuffer.remaining()
9694
} else {
@@ -102,7 +100,6 @@ private[sql] trait CompressibleColumnBuilder[T <: AtomicType]
102100
.allocate(headerSize + 4 + compressedSize)
103101
.order(ByteOrder.nativeOrder)
104102
// Write the header
105-
.putInt(typeId)
106103
.putInt(nullCount)
107104
.put(nulls)
108105

sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ private[sql] object CompressionScheme {
7474

7575
def columnHeaderSize(columnBuffer: ByteBuffer): Int = {
7676
val header = columnBuffer.duplicate().order(ByteOrder.nativeOrder)
77-
val nullCount = header.getInt(4)
78-
// Column type ID + null count + null positions
79-
4 + 4 + 4 * nullCount
77+
val nullCount = header.getInt()
78+
// null count + null positions
79+
4 + 4 * nullCount
8080
}
8181
}

sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,7 @@ class ColumnStatsSuite extends SparkFunSuite {
2727
testColumnStats(classOf[ByteColumnStats], BYTE, createRow(Byte.MaxValue, Byte.MinValue, 0))
2828
testColumnStats(classOf[ShortColumnStats], SHORT, createRow(Short.MaxValue, Short.MinValue, 0))
2929
testColumnStats(classOf[IntColumnStats], INT, createRow(Int.MaxValue, Int.MinValue, 0))
30-
testColumnStats(classOf[DateColumnStats], DATE, createRow(Int.MaxValue, Int.MinValue, 0))
3130
testColumnStats(classOf[LongColumnStats], LONG, createRow(Long.MaxValue, Long.MinValue, 0))
32-
testColumnStats(classOf[TimestampColumnStats], TIMESTAMP,
33-
createRow(Long.MaxValue, Long.MinValue, 0))
3431
testColumnStats(classOf[FloatColumnStats], FLOAT, createRow(Float.MaxValue, Float.MinValue, 0))
3532
testColumnStats(classOf[DoubleColumnStats], DOUBLE,
3633
createRow(Double.MaxValue, Double.MinValue, 0))

0 commit comments

Comments
 (0)