Skip to content

Commit 2780d6a

Browse files
committed
[WIP] in-memory columnar compression support
* Added two more compression schemes (RLE & dictionary encoding) * Moved compression support code to columnar.compression * Various refactoring
1 parent 211331c commit 2780d6a

15 files changed

+698
-294
lines changed

sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.nio.{ByteOrder, ByteBuffer}
2121

2222
import org.apache.spark.sql.catalyst.types.{BinaryType, NativeType, DataType}
2323
import org.apache.spark.sql.catalyst.expressions.MutableRow
24+
import org.apache.spark.sql.columnar.compression.CompressibleColumnAccessor
2425

2526
/**
2627
* An `Iterator` like trait used to extract values from columnar byte buffer. When a value is
@@ -53,17 +54,17 @@ private[sql] abstract class BasicColumnAccessor[T <: DataType, JvmType](
5354
columnType.setField(row, ordinal, extractSingle(buffer))
5455
}
5556

56-
def extractSingle(buffer: ByteBuffer) = columnType.extract(buffer)
57+
def extractSingle(buffer: ByteBuffer): JvmType = columnType.extract(buffer)
5758

5859
protected def underlyingBuffer = buffer
5960
}
6061

6162
private[sql] abstract class NativeColumnAccessor[T <: NativeType](
62-
buffer: ByteBuffer,
63-
columnType: NativeColumnType[T])
63+
override protected val buffer: ByteBuffer,
64+
override protected val columnType: NativeColumnType[T])
6465
extends BasicColumnAccessor(buffer, columnType)
6566
with NullableColumnAccessor
66-
with CompressedColumnAccessor[T]
67+
with CompressibleColumnAccessor[T]
6768

6869
private[sql] class BooleanColumnAccessor(buffer: ByteBuffer)
6970
extends NativeColumnAccessor(buffer, BOOLEAN)
@@ -98,9 +99,8 @@ private[sql] class GenericColumnAccessor(buffer: ByteBuffer)
9899
with NullableColumnAccessor
99100

100101
private[sql] object ColumnAccessor {
101-
def apply(b: ByteBuffer): ColumnAccessor = {
102-
// The first 4 bytes in the buffer indicates the column type.
103-
val buffer = b.duplicate().order(ByteOrder.nativeOrder())
102+
def apply(buffer: ByteBuffer): ColumnAccessor = {
103+
// The first 4 bytes in the buffer indicate the column type.
104104
val columnTypeId = buffer.getInt()
105105

106106
columnTypeId match {

sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.nio.{ByteBuffer, ByteOrder}
2222
import org.apache.spark.sql.Row
2323
import org.apache.spark.sql.catalyst.types._
2424
import org.apache.spark.sql.columnar.ColumnBuilder._
25+
import org.apache.spark.sql.columnar.compression.{AllCompressionSchemes, CompressibleColumnBuilder}
2526

2627
private[sql] trait ColumnBuilder {
2728
/**
@@ -30,22 +31,23 @@ private[sql] trait ColumnBuilder {
3031
def initialize(initialSize: Int, columnName: String = "")
3132

3233
/**
33-
* Gathers statistics information from `row(ordinal)`.
34+
* Appends `row(ordinal)` to the column builder.
3435
*/
35-
def gatherStats(row: Row, ordinal: Int) {}
36+
def appendFrom(row: Row, ordinal: Int)
3637

3738
/**
38-
* Appends `row(ordinal)` to the column builder.
39+
* Column statistics information
3940
*/
40-
def appendFrom(row: Row, ordinal: Int)
41+
def columnStats: ColumnStats[_, _]
4142

4243
/**
4344
* Returns the final columnar byte buffer.
4445
*/
4546
def build(): ByteBuffer
4647
}
4748

48-
private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType](
49+
private[sql] class BasicColumnBuilder[T <: DataType, JvmType](
50+
val columnStats: ColumnStats[T, JvmType],
4951
val columnType: ColumnType[T, JvmType])
5052
extends ColumnBuilder {
5153

@@ -74,20 +76,20 @@ private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType](
7476
}
7577
}
7678

77-
private[sql] abstract class NativeColumnBuilder[T <: NativeType](
78-
protected val columnStats: ColumnStats[T],
79-
columnType: NativeColumnType[T])
80-
extends BasicColumnBuilder(columnType)
79+
private[sql] abstract class ComplexColumnBuilder[T <: DataType, JvmType](
80+
columnType: ColumnType[T, JvmType])
81+
extends BasicColumnBuilder[T, JvmType](new NoopColumnStats[T, JvmType], columnType)
8182
with NullableColumnBuilder
82-
with CompressedColumnBuilder[T] {
8383

84-
override def gatherStats(row: Row, ordinal: Int) {
85-
columnStats.gatherStats(row, ordinal)
86-
}
87-
}
84+
private[sql] abstract class NativeColumnBuilder[T <: NativeType](
85+
override val columnStats: NativeColumnStats[T],
86+
override val columnType: NativeColumnType[T])
87+
extends BasicColumnBuilder[T, T#JvmType](columnStats, columnType)
88+
with NullableColumnBuilder
89+
with AllCompressionSchemes
90+
with CompressibleColumnBuilder[T]
8891

89-
private[sql] class BooleanColumnBuilder
90-
extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN)
92+
private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN)
9193

9294
private[sql] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT)
9395

@@ -101,16 +103,12 @@ private[sql] class DoubleColumnBuilder extends NativeColumnBuilder(new DoubleCol
101103

102104
private[sql] class FloatColumnBuilder extends NativeColumnBuilder(new FloatColumnStats, FLOAT)
103105

104-
private[sql] class StringColumnBuilder extends NativeColumnBuilder(new StringColumnStates, STRING)
106+
private[sql] class StringColumnBuilder extends NativeColumnBuilder(new StringColumnStats, STRING)
105107

106-
private[sql] class BinaryColumnBuilder
107-
extends BasicColumnBuilder[BinaryType.type, Array[Byte]](BINARY)
108-
with NullableColumnBuilder
108+
private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(BINARY)
109109

110110
// TODO (lian) Add support for array, struct and map
111-
private[sql] class GenericColumnBuilder
112-
extends BasicColumnBuilder[DataType, Array[Byte]](GENERIC)
113-
with NullableColumnBuilder
111+
private[sql] class GenericColumnBuilder extends ComplexColumnBuilder(GENERIC)
114112

115113
private[sql] object ColumnBuilder {
116114
val DEFAULT_INITIAL_BUFFER_SIZE = 10 * 1024 * 104

0 commit comments

Comments
 (0)