Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
95b3301
Fixed bugs in IntegralDelta
liancheng Apr 8, 2014
052bf41
Bug fix: should only gather compressibility info for non-null values
liancheng Apr 8, 2014
44591a5
Bug fix: NullableColumnAccessor.hasNext must take nulls into account
liancheng Apr 9, 2014
036cd09
Clean up unused imports
liancheng Apr 9, 2014
8426ddc
Bug fix: InMemoryColumnarTableScan should cache columns specified by …
liancheng Apr 9, 2014
e619995
Bug fix: incorrect byte order in CompressionScheme.columnHeaderSize
liancheng Apr 9, 2014
9c8fc40
Disable compression by default
liancheng Apr 9, 2014
c9b0f6f
Let InsertIntoTable support InMemoryColumnarTableScan
liancheng Apr 9, 2014
6360723
Made PreInsertionCasts support SparkLogicalPlan and InMemoryColumnarT…
liancheng Apr 9, 2014
2d0e168
Run Hive tests in-memory too.
marmbrus Apr 8, 2014
e36cdd0
Spelling.
marmbrus Apr 10, 2014
1965123
Don't use coalesce for gathering all data to a single partition, as i…
marmbrus Apr 10, 2014
ab9e807
Fix the logged console version of failed test cases to use the new sy…
marmbrus Apr 10, 2014
d1df4fd
Remove test tables that might always get created anyway?
marmbrus Apr 10, 2014
4390bcc
Report error for any Throwable in HiveComparisonTest
liancheng Apr 11, 2014
99382bf
Enable compression by default
liancheng Apr 11, 2014
32cc9ce
Code style cleanup
liancheng Apr 11, 2014
882c538
Remove attributes field from InMemoryColumnarTableScan
liancheng Apr 11, 2014
5bdbfe7
Revert 882c538 & 8426ddc, which introduced regression
liancheng Apr 11, 2014
6ad6d9b
Merged HiveCompatibilitySuite and HiveInMemoryCompatibilitySuite
liancheng Apr 11, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class SchemaRDD(
def baseSchemaRDD = this

// =========================================================================================
// RDD functions: Copy the interal row representation so we present immutable data to users.
// RDD functions: Copy the internal row representation so we present immutable data to users.
// =========================================================================================

override def compute(split: Partition, context: TaskContext): Iterator[Row] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,6 @@ private[sql] trait NullableColumnAccessor extends ColumnAccessor {

pos += 1
}

abstract override def hasNext = seenNulls < nullCount || super.hasNext
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]

abstract override def appendFrom(row: Row, ordinal: Int) {
super.appendFrom(row, ordinal)
gatherCompressibilityStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
gatherCompressibilityStats(row, ordinal)
}
}

abstract override def build() = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.columnar.compression

import java.nio.ByteBuffer
import java.nio.{ByteOrder, ByteBuffer}

import org.apache.spark.sql.catalyst.types.NativeType
import org.apache.spark.sql.columnar.{ColumnType, NativeColumnType}
Expand Down Expand Up @@ -84,7 +84,7 @@ private[sql] object CompressionScheme {
}

def columnHeaderSize(columnBuffer: ByteBuffer): Int = {
val header = columnBuffer.duplicate()
val header = columnBuffer.duplicate().order(ByteOrder.nativeOrder)
val nullCount = header.getInt(4)
// Column type ID + null count + null positions
4 + 4 + 4 * nullCount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,26 +396,27 @@ private[sql] sealed abstract class IntegralDelta[I <: IntegralType] extends Comp

if (initial) {
initial = false
prev = value
_compressedSize += 1 + columnType.defaultSize
} else {
val (smallEnough, _) = byteSizedDelta(value, prev)
_compressedSize += (if (smallEnough) 1 else 1 + columnType.defaultSize)
}

prev = value
}

override def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[I]) = {
to.putInt(typeId)

if (from.hasRemaining) {
val prev = columnType.extract(from)

var prev = columnType.extract(from)
to.put(Byte.MinValue)
columnType.append(prev, to)

while (from.hasRemaining) {
val current = columnType.extract(from)
val (smallEnough, delta) = byteSizedDelta(current, prev)
prev = current

if (smallEnough) {
to.put(delta)
Expand All @@ -442,13 +443,8 @@ private[sql] sealed abstract class IntegralDelta[I <: IntegralType] extends Comp

override def next() = {
val delta = buffer.get()

if (delta > Byte.MinValue) {
addDelta(prev, delta)
} else {
prev = columnType.extract(buffer)
prev
}
prev = if (delta > Byte.MinValue) addDelta(prev, delta) else columnType.extract(buffer)
prev
}

override def hasNext = buffer.hasRemaining
Expand All @@ -464,7 +460,7 @@ private[sql] case object IntDelta extends IntegralDelta[IntegerType.type] {

override protected def byteSizedDelta(x: Int, y: Int): (Boolean, Byte) = {
val delta = x - y
if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
if (math.abs(delta) <= Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
}
}

Expand All @@ -477,6 +473,6 @@ private[sql] case object LongDelta extends IntegralDelta[LongType.type] {

override protected def byteSizedDelta(x: Long, y: Long): (Boolean, Byte) = {
val delta = x - y
if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
if (math.abs(delta) <= Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,14 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
shuffled.map(_._1)

case SinglePartition =>
child.execute().coalesce(1, shuffle = true)
val rdd = child.execute().mapPartitions { iter =>
val mutablePair = new MutablePair[Null, Row]()
iter.map(r => mutablePair.update(null, r))
}
val partitioner = new HashPartitioner(1)
val shuffled = new ShuffledRDD[Null, Row, MutablePair[Null, Row]](rdd, partitioner)
shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
shuffled.map(_._2)

case _ => sys.error(s"Exchange not implemented for $newPartitioning")
// TODO: Handle BroadcastPartitioning.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
SparkLogicalPlan(
alreadyPlanned match {
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
case InMemoryColumnarTableScan(output, child) =>
InMemoryColumnarTableScan(output.map(_.newInstance), child)
case scan @ InMemoryColumnarTableScan(output, child) =>
scan.copy(attributes = output.map(_.newInstance))
case _ => sys.error("Multiple instance of the same relation detected.")
}).asInstanceOf[this.type]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

package org.apache.spark.sql

import org.scalatest.FunSuite
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.execution.SparkLogicalPlan
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
import org.apache.spark.sql.execution.SparkLogicalPlan
import org.apache.spark.sql.test.TestSQLContext

class CachedTableSuite extends QueryTest {
TestData // Load test tables.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package org.apache.spark.sql.columnar

import org.apache.spark.sql.{QueryTest, TestData}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.SparkLogicalPlan
import org.apache.spark.sql.test.TestSQLContext

class ColumnarQuerySuite extends QueryTest {
class InMemoryColumnarQuerySuite extends QueryTest {
import TestData._
import TestSQLContext._

Expand All @@ -32,6 +33,15 @@ class ColumnarQuerySuite extends QueryTest {
checkAnswer(scan, testData.collect().toSeq)
}

test("projection") {
val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))

checkAnswer(scan, testData.collect().map {
case Row(key: Int, value: String) => value -> key
}.toSeq)
}

test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,16 @@ class NullableColumnAccessorSuite extends FunSuite {
val row = new GenericMutableRow(1)

(0 until 4).foreach { _ =>
assert(accessor.hasNext)
accessor.extractTo(row, 0)
assert(row(0) === randomRow(0))

assert(accessor.hasNext)
accessor.extractTo(row, 0)
assert(row.isNullAt(0))
}

assert(!accessor.hasNext)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.scalatest.FunSuite
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.catalyst.types.IntegralType
import org.apache.spark.sql.columnar._
import org.apache.spark.sql.columnar.ColumnarTestUtils._

class IntegralDeltaSuite extends FunSuite {
testIntegralDelta(new IntColumnStats, INT, IntDelta)
Expand Down Expand Up @@ -63,7 +64,7 @@ class IntegralDeltaSuite extends FunSuite {
} else {
val oneBoolean = columnType.defaultSize
1 + oneBoolean + deltas.map {
d => if (math.abs(d) < Byte.MaxValue) 1 else 1 + oneBoolean
d => if (math.abs(d) <= Byte.MaxValue) 1 else 1 + oneBoolean
}.sum
})

Expand All @@ -78,7 +79,7 @@ class IntegralDeltaSuite extends FunSuite {
expectResult(input.head, "The first value is wrong")(columnType.extract(buffer))

(input.tail, deltas).zipped.foreach { (value, delta) =>
if (delta < Byte.MaxValue) {
if (math.abs(delta) <= Byte.MaxValue) {
expectResult(delta, "Wrong delta")(buffer.get())
} else {
expectResult(Byte.MinValue, "Expecting escaping mark here")(buffer.get())
Expand All @@ -105,11 +106,17 @@ class IntegralDeltaSuite extends FunSuite {

test(s"$scheme: simple case") {
val input = columnType match {
case INT => Seq(1: Int, 2: Int, 130: Int)
case LONG => Seq(1: Long, 2: Long, 130: Long)
case INT => Seq(2: Int, 1: Int, 2: Int, 130: Int)
case LONG => Seq(2: Long, 1: Long, 2: Long, 130: Long)
}

skeleton(input.map(_.asInstanceOf[I#JvmType]))
}

test(s"$scheme: long random series") {
// Have to workaround with `Any` since no `ClassTag[I#JvmType]` available here.
val input = Array.fill[Any](10000)(makeRandomValue(columnType))
skeleton(input.map(_.asInstanceOf[I#JvmType]))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution.SparkLogicalPlan
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan

/* Implicit conversions */
import scala.collection.JavaConversions._
Expand Down Expand Up @@ -115,23 +117,31 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
case p: LogicalPlan if !p.childrenResolved => p

case p @ InsertIntoTable(table: MetastoreRelation, _, child, _) =>
val childOutputDataTypes = child.output.map(_.dataType)
// Only check attributes, not partitionKeys since they are always strings.
// TODO: Fully support inserting into partitioned tables.
val tableOutputDataTypes = table.attributes.map(_.dataType)

if (childOutputDataTypes == tableOutputDataTypes) {
p
} else {
// Only do the casting when child output data types differ from table output data types.
val castedChildOutput = child.output.zip(table.output).map {
case (input, output) if input.dataType != output.dataType =>
Alias(Cast(input, output.dataType), input.name)()
case (input, _) => input
}

p.copy(child = logical.Project(castedChildOutput, child))
castChildOutput(p, table, child)

case p @ logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
_, HiveTableScan(_, table, _))), _, child, _) =>
castChildOutput(p, table, child)
}

def castChildOutput(p: InsertIntoTable, table: MetastoreRelation, child: LogicalPlan) = {
val childOutputDataTypes = child.output.map(_.dataType)
// Only check attributes, not partitionKeys since they are always strings.
// TODO: Fully support inserting into partitioned tables.
val tableOutputDataTypes = table.attributes.map(_.dataType)

if (childOutputDataTypes == tableOutputDataTypes) {
p
} else {
// Only do the casting when child output data types differ from table output data types.
val castedChildOutput = child.output.zip(table.output).map {
case (input, output) if input.dataType != output.dataType =>
Alias(Cast(input, output.dataType), input.name)()
case (input, _) => input
}

p.copy(child = logical.Project(castedChildOutput, child))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan

trait HiveStrategies {
// Possibly being too clever with types here... or not clever enough.
Expand All @@ -42,6 +43,9 @@ trait HiveStrategies {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
_, HiveTableScan(_, table, _))), partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case _ => Nil
}
}
Expand Down
10 changes: 4 additions & 6 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,6 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
TestTable("src1",
"CREATE TABLE src1 (key INT, value STRING)".cmd,
s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO TABLE src1".cmd),
TestTable("dest1",
"CREATE TABLE IF NOT EXISTS dest1 (key INT, value STRING)".cmd),
TestTable("dest2",
"CREATE TABLE IF NOT EXISTS dest2 (key INT, value STRING)".cmd),
TestTable("dest3",
"CREATE TABLE IF NOT EXISTS dest3 (key INT, value STRING)".cmd),
TestTable("srcpart", () => {
runSqlHive(
"CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)")
Expand Down Expand Up @@ -257,6 +251,7 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {

private val loadedTables = new collection.mutable.HashSet[String]

var cacheTables: Boolean = false
def loadTestTable(name: String) {
if (!(loadedTables contains name)) {
// Marks the table as loaded first to prevent infite mutually recursive table loading.
Expand All @@ -265,6 +260,9 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
val createCmds =
testTables.get(name).map(_.commands).getOrElse(sys.error(s"Unknown test table $name"))
createCmds.foreach(_())

if (cacheTables)
cacheTable(name)
}
}

Expand Down
16 changes: 8 additions & 8 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ trait HiveFunctionFactory {
}
}

abstract class HiveUdf
extends Expression with Logging with HiveFunctionFactory {
abstract class HiveUdf extends Expression with Logging with HiveFunctionFactory {
self: Product =>

type UDFType
Expand All @@ -146,7 +145,7 @@ abstract class HiveUdf
lazy val functionInfo = getFunctionInfo(name)
lazy val function = createFunction[UDFType](name)

override def toString = s"${nodeName}#${functionInfo.getDisplayName}(${children.mkString(",")})"
override def toString = s"$nodeName#${functionInfo.getDisplayName}(${children.mkString(",")})"
}

case class HiveSimpleUdf(name: String, children: Seq[Expression]) extends HiveUdf {
Expand Down Expand Up @@ -202,10 +201,11 @@ case class HiveSimpleUdf(name: String, children: Seq[Expression]) extends HiveUd
}
}

case class HiveGenericUdf(
name: String,
children: Seq[Expression]) extends HiveUdf with HiveInspectors {
case class HiveGenericUdf(name: String, children: Seq[Expression])
extends HiveUdf with HiveInspectors {

import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._

type UDFType = GenericUDF

@transient
Expand Down Expand Up @@ -357,7 +357,7 @@ case class HiveGenericUdaf(

override def toString = s"$nodeName#$name(${children.mkString(",")})"

def newInstance = new HiveUdafFunction(name, children, this)
def newInstance() = new HiveUdafFunction(name, children, this)
}

/**
Expand Down Expand Up @@ -435,7 +435,7 @@ case class HiveGenericUdtf(
}
}

override def toString() = s"$nodeName#$name(${children.mkString(",")})"
override def toString = s"$nodeName#$name(${children.mkString(",")})"
}

case class HiveUdafFunction(
Expand Down
Loading