Skip to content

Commit 226d388

Browse files
hvanhovellcloud-fan
authored andcommitted
[SPARK-19548][SQL] Support Hive UDFs which return typed Lists/Maps
## What changes were proposed in this pull request? This PR adds support for Hive UDFs that return fully typed java Lists or Maps, for example `List<String>` or `Map<String, Integer>`. It is also allowed to nest these structures, for example `Map<String, List<Integer>>`. Raw collections or collections using wildcards are still not supported, and cannot be supported due to the lack of type information. ## How was this patch tested? Modified existing tests in `HiveUDFSuite`, and I have added test cases for raw collection and collection using wildcards. Author: Herman van Hovell <[email protected]> Closes #16886 from hvanhovell/SPARK-19548.
1 parent d785217 commit 226d388

File tree

11 files changed

+250
-57
lines changed

11 files changed

+250
-57
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.hive
1919

20+
import java.lang.reflect.{ParameterizedType, Type, WildcardType}
21+
2022
import scala.collection.JavaConverters._
2123

2224
import org.apache.hadoop.{io => hadoopIo}
@@ -178,7 +180,7 @@ import org.apache.spark.unsafe.types.UTF8String
178180
*/
179181
private[hive] trait HiveInspectors {
180182

181-
def javaClassToDataType(clz: Class[_]): DataType = clz match {
183+
def javaTypeToDataType(clz: Type): DataType = clz match {
182184
// writable
183185
case c: Class[_] if c == classOf[hadoopIo.DoubleWritable] => DoubleType
184186
case c: Class[_] if c == classOf[hiveIo.DoubleWritable] => DoubleType
@@ -218,26 +220,42 @@ private[hive] trait HiveInspectors {
218220
case c: Class[_] if c == java.lang.Float.TYPE => FloatType
219221
case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType
220222

221-
case c: Class[_] if c.isArray => ArrayType(javaClassToDataType(c.getComponentType))
223+
case c: Class[_] if c.isArray => ArrayType(javaTypeToDataType(c.getComponentType))
222224

223225
// Hive seems to return this for struct types?
224226
case c: Class[_] if c == classOf[java.lang.Object] => NullType
225227

226-
// java list type unsupported
227-
case c: Class[_] if c == classOf[java.util.List[_]] =>
228+
case p: ParameterizedType if isSubClassOf(p.getRawType, classOf[java.util.List[_]]) =>
229+
val Array(elementType) = p.getActualTypeArguments
230+
ArrayType(javaTypeToDataType(elementType))
231+
232+
case p: ParameterizedType if isSubClassOf(p.getRawType, classOf[java.util.Map[_, _]]) =>
233+
val Array(keyType, valueType) = p.getActualTypeArguments
234+
MapType(javaTypeToDataType(keyType), javaTypeToDataType(valueType))
235+
236+
// raw java list type unsupported
237+
case c: Class[_] if isSubClassOf(c, classOf[java.util.List[_]]) =>
228238
throw new AnalysisException(
229-
"List type in java is unsupported because " +
230-
"JVM type erasure makes spark fail to catch a component type in List<>")
239+
"Raw list type in java is unsupported because Spark cannot infer the element type.")
231240

232-
// java map type unsupported
233-
case c: Class[_] if c == classOf[java.util.Map[_, _]] =>
241+
// raw java map type unsupported
242+
case c: Class[_] if isSubClassOf(c, classOf[java.util.Map[_, _]]) =>
234243
throw new AnalysisException(
235-
"Map type in java is unsupported because " +
236-
"JVM type erasure makes spark fail to catch key and value types in Map<>")
244+
"Raw map type in java is unsupported because Spark cannot infer key and value types.")
245+
246+
case _: WildcardType =>
247+
throw new AnalysisException(
248+
"Collection types with wildcards (e.g. List<?> or Map<?, ?>) are unsupported because " +
249+
"Spark cannot infer the data type for these type parameters.")
237250

238251
case c => throw new AnalysisException(s"Unsupported java type $c")
239252
}
240253

254+
private def isSubClassOf(t: Type, parent: Class[_]): Boolean = t match {
255+
case cls: Class[_] => parent.isAssignableFrom(cls)
256+
case _ => false
257+
}
258+
241259
private def withNullSafe(f: Any => Any): Any => Any = {
242260
input => if (input == null) null else f(input)
243261
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ private[hive] case class HiveSimpleUDF(
7070
@transient
7171
private lazy val conversionHelper = new ConversionHelper(method, arguments)
7272

73-
override lazy val dataType = javaClassToDataType(method.getReturnType)
73+
override lazy val dataType = javaTypeToDataType(method.getGenericReturnType)
7474

7575
@transient
7676
private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), x.dataType)).toArray
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.hive.execution;
18+
19+
import org.apache.hadoop.hive.ql.exec.UDF;
20+
21+
import java.util.Collections;
22+
import java.util.List;
23+
24+
/**
25+
* UDF that returns a raw (non-parameterized) java List.
26+
*/
27+
public class UDFRawList extends UDF {
28+
public List evaluate(Object o) {
29+
return Collections.singletonList("data1");
30+
}
31+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.hive.execution;
18+
19+
import org.apache.hadoop.hive.ql.exec.UDF;
20+
21+
import java.util.Collections;
22+
import java.util.Map;
23+
24+
/**
25+
* UDF that returns a raw (non-parameterized) java Map.
26+
*/
27+
public class UDFRawMap extends UDF {
28+
public Map evaluate(Object o) {
29+
return Collections.singletonMap("a", "1");
30+
}
31+
}

sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToIntIntMap.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@
2323
import java.util.Map;
2424

2525
public class UDFToIntIntMap extends UDF {
26-
public Map<Integer, Integer> evaluate(Object o) {
27-
return new HashMap<Integer, Integer>() {
28-
{
29-
put(1, 1);
30-
put(2, 1);
31-
put(3, 1);
32-
}
33-
};
34-
}
26+
public Map<Integer, Integer> evaluate(Object o) {
27+
return new HashMap<Integer, Integer>() {
28+
{
29+
put(1, 1);
30+
put(2, 1);
31+
put(3, 1);
32+
}
33+
};
34+
}
3535
}

sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListInt.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919

2020
import org.apache.hadoop.hive.ql.exec.UDF;
2121

22+
import java.util.ArrayList;
2223
import java.util.Arrays;
23-
import java.util.List;
2424

2525
public class UDFToListInt extends UDF {
26-
public List<Integer> evaluate(Object o) {
27-
return Arrays.asList(1, 2, 3);
28-
}
26+
public ArrayList<Integer> evaluate(Object o) {
27+
return new ArrayList<>(Arrays.asList(1, 2, 3));
28+
}
2929
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.hive.execution;
18+
19+
import org.apache.hadoop.hive.ql.exec.UDF;
20+
21+
import java.util.*;
22+
23+
/**
24+
* UDF that returns a nested list of maps that uses a string as its key and a list of ints as its
25+
* values.
26+
*/
27+
public class UDFToListMapStringListInt extends UDF {
28+
public List<Map<String, List<Integer>>> evaluate(Object o) {
29+
final Map<String, List<Integer>> map = new HashMap<>();
30+
map.put("a", Arrays.asList(1, 2));
31+
map.put("b", Arrays.asList(3, 4));
32+
return Collections.singletonList(map);
33+
}
34+
}

sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import java.util.List;
2424

2525
public class UDFToListString extends UDF {
26-
public List<String> evaluate(Object o) {
27-
return Arrays.asList("data1", "data2", "data3");
28-
}
26+
public List<String> evaluate(Object o) {
27+
return Arrays.asList("data1", "data2", "data3");
28+
}
2929
}

sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToStringIntMap.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,15 @@
2020
import org.apache.hadoop.hive.ql.exec.UDF;
2121

2222
import java.util.HashMap;
23-
import java.util.Map;
2423

2524
public class UDFToStringIntMap extends UDF {
26-
public Map<String, Integer> evaluate(Object o) {
27-
return new HashMap<String, Integer>() {
28-
{
29-
put("key1", 1);
30-
put("key2", 2);
31-
put("key3", 3);
32-
}
33-
};
34-
}
25+
public HashMap<String, Integer> evaluate(Object o) {
26+
return new HashMap<String, Integer>() {
27+
{
28+
put("key1", 1);
29+
put("key2", 2);
30+
put("key3", 3);
31+
}
32+
};
33+
}
3534
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.hive.execution;
18+
19+
import org.apache.hadoop.hive.ql.exec.UDF;
20+
21+
import java.util.Collections;
22+
import java.util.List;
23+
24+
/**
25+
* UDF that returns a raw (non-parameterized) java List.
26+
*/
27+
public class UDFWildcardList extends UDF {
28+
public List<?> evaluate(Object o) {
29+
return Collections.singletonList("data1");
30+
}
31+
}

0 commit comments

Comments
 (0)