-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-12293][SQL] Support UnsafeRow in LocalTableScan #10283
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ca4326a
a0a991a
f0e6ac0
ee89482
97a81c3
5b2a09c
97d390f
2500de3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,6 +61,7 @@ object ScalaReflection extends ScalaReflection { | |
| case t if t <:< definitions.ByteTpe => ByteType | ||
| case t if t <:< definitions.BooleanTpe => BooleanType | ||
| case t if t <:< localTypeOf[Array[Byte]] => BinaryType | ||
| case t if t <:< localTypeOf[Decimal] => DecimalType.SYSTEM_DEFAULT | ||
| case _ => | ||
| val className = getClassNameFromType(tpe) | ||
| className match { | ||
|
|
@@ -177,6 +178,7 @@ object ScalaReflection extends ScalaReflection { | |
| case _ => UpCast(expr, expected, walkedTypePath) | ||
| } | ||
|
|
||
| val className = getClassNameFromType(tpe) | ||
| tpe match { | ||
| case t if !dataTypeFor(t).isInstanceOf[ObjectType] => getPath | ||
|
|
||
|
|
@@ -372,6 +374,17 @@ object ScalaReflection extends ScalaReflection { | |
| } else { | ||
| newInstance | ||
| } | ||
|
|
||
| case t if Utils.classIsLoadable(className) && | ||
| Utils.classForName(className).isAnnotationPresent(classOf[SQLUserDefinedType]) => | ||
| val udt = Utils.classForName(className) | ||
| .getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance() | ||
| val obj = NewInstance( | ||
| udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt(), | ||
| Nil, | ||
| false, | ||
| dataType = ObjectType(udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt())) | ||
| Invoke(obj, "deserialize", ObjectType(udt.userClass), getPath :: Nil) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -406,11 +419,16 @@ object ScalaReflection extends ScalaReflection { | |
| def toCatalystArray(input: Expression, elementType: `Type`): Expression = { | ||
| val externalDataType = dataTypeFor(elementType) | ||
| val Schema(catalystType, nullable) = silentSchemaFor(elementType) | ||
| if (isNativeType(catalystType)) { | ||
| NewInstance( | ||
|
|
||
| if (isNativeType(catalystType) && !(elementType <:< localTypeOf[Option[_]])) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. Thanks. I propose this fixing as another pr #10391. |
||
| val array = NewInstance( | ||
| classOf[GenericArrayData], | ||
| input :: Nil, | ||
| dataType = ArrayType(catalystType, nullable)) | ||
| expressions.If( | ||
| IsNull(input), | ||
| expressions.Literal.create(null, ArrayType(catalystType, nullable)), | ||
| array) | ||
| } else { | ||
| val clsName = getClassNameFromType(elementType) | ||
| val newPath = s"""- array element class: "$clsName"""" +: walkedTypePath | ||
|
|
@@ -421,6 +439,7 @@ object ScalaReflection extends ScalaReflection { | |
| if (!inputObject.dataType.isInstanceOf[ObjectType]) { | ||
| inputObject | ||
| } else { | ||
| val className = getClassNameFromType(tpe) | ||
| tpe match { | ||
| case t if t <:< localTypeOf[Option[_]] => | ||
| val TypeRef(_, _, Seq(optType)) = t | ||
|
|
@@ -589,6 +608,17 @@ object ScalaReflection extends ScalaReflection { | |
| case t if t <:< localTypeOf[java.lang.Boolean] => | ||
| Invoke(inputObject, "booleanValue", BooleanType) | ||
|
|
||
| case t if Utils.classIsLoadable(className) && | ||
| Utils.classForName(className).isAnnotationPresent(classOf[SQLUserDefinedType]) => | ||
| val udt = Utils.classForName(className) | ||
| .getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance() | ||
| val obj = NewInstance( | ||
| udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt(), | ||
| Nil, | ||
| false, | ||
| dataType = ObjectType(udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt())) | ||
| Invoke(obj, "serialize", udt.sqlType, inputObject :: Nil) | ||
|
|
||
| case other => | ||
| throw new UnsupportedOperationException( | ||
| s"No Encoder found for $tpe\n" + walkedTypePath.mkString("\n")) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,14 +18,30 @@ | |
| package org.apache.spark.sql.catalyst.util | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
| import scala.collection.mutable.WrappedArray | ||
|
|
||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.types.{DataType, Decimal} | ||
| import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} | ||
|
|
||
| object GenericArrayData { | ||
| def processSeq(seq: Seq[Any]): Array[Any] = { | ||
| seq match { | ||
| case wArray: WrappedArray[_] => | ||
| if (wArray.array == null) { | ||
| null | ||
| } else { | ||
| wArray.toArray[Any] | ||
| } | ||
| case null => null | ||
| case _ => seq.toArray | ||
| } | ||
| } | ||
| } | ||
|
|
||
| class GenericArrayData(val array: Array[Any]) extends ArrayData { | ||
|
|
||
| def this(seq: Seq[Any]) = this(seq.toArray) | ||
| def this(seq: Seq[Any]) = this(GenericArrayData.processSeq(seq)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. instead of handling null here, I think a better way is not passing null to it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. I propose the fixing as pr #10401. |
||
| def this(list: java.util.List[Any]) = this(list.asScala) | ||
|
|
||
| // TODO: This is boxing. We should specialize. | ||
|
|
@@ -39,7 +55,11 @@ class GenericArrayData(val array: Array[Any]) extends ArrayData { | |
|
|
||
| override def copy(): ArrayData = new GenericArrayData(array.clone()) | ||
|
|
||
| override def numElements(): Int = array.length | ||
| override def numElements(): Int = if (array != null) { | ||
| array.length | ||
| } else { | ||
| 0 | ||
| } | ||
|
|
||
| private def getAs[T](ordinal: Int) = array(ordinal).asInstanceOf[T] | ||
| override def isNullAt(ordinal: Int): Boolean = getAs[AnyRef](ordinal) eq null | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally
Decimalshould only be used inside spark SQL as the internal representation of decimal type, and we don't need to catch it here. Do we break it in tests?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
constructorForwill calldataTypeForto determine if a type isObjectTypeor not. If there is not case forDecimal, it will be recognized asObjectTypeand causes bug.