Skip to content

Commit 305418c

Browse files
zhzhanliancheng
authored andcommitted
orc data source support
1 parent 578bfee commit 305418c

File tree

10 files changed

+1883
-0
lines changed

10 files changed

+1883
-0
lines changed
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.orc
19+
20+
import org.apache.hadoop.hive.common.`type`.HiveVarchar
21+
import org.apache.spark.sql.hive.{HiveInspectors, HiveShim}
22+
import org.apache.hadoop.hive.serde2.objectinspector._
23+
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
24+
import org.apache.spark.sql.catalyst.expressions.{Row, MutableRow}
25+
26+
import scala.collection.JavaConversions._
27+
28+
/**
29+
* We can consolidate TableReader.unwrappers and HiveInspectors.wrapperFor to use
30+
* this class.
31+
*
32+
*/
33+
private[hive] object HadoopTypeConverter extends HiveInspectors {
34+
/**
35+
* Builds specific unwrappers ahead of time according to object inspector
36+
* types to avoid pattern matching and branching costs per row.
37+
*/
38+
def unwrappers(fieldRefs: Seq[StructField]): Seq[(Any, MutableRow, Int) => Unit] = fieldRefs.map {
39+
_.getFieldObjectInspector match {
40+
case oi: BooleanObjectInspector =>
41+
(value: Any, row: MutableRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value))
42+
case oi: ByteObjectInspector =>
43+
(value: Any, row: MutableRow, ordinal: Int) => row.setByte(ordinal, oi.get(value))
44+
case oi: ShortObjectInspector =>
45+
(value: Any, row: MutableRow, ordinal: Int) => row.setShort(ordinal, oi.get(value))
46+
case oi: IntObjectInspector =>
47+
(value: Any, row: MutableRow, ordinal: Int) => row.setInt(ordinal, oi.get(value))
48+
case oi: LongObjectInspector =>
49+
(value: Any, row: MutableRow, ordinal: Int) => row.setLong(ordinal, oi.get(value))
50+
case oi: FloatObjectInspector =>
51+
(value: Any, row: MutableRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value))
52+
case oi: DoubleObjectInspector =>
53+
(value: Any, row: MutableRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value))
54+
case oi =>
55+
(value: Any, row: MutableRow, ordinal: Int) => row(ordinal) = unwrap(value, oi)
56+
}
57+
}
58+
59+
/**
60+
* Wraps with Hive types based on object inspector.
61+
*/
62+
def wrappers(oi: ObjectInspector): Any => Any = oi match {
63+
case _: JavaHiveVarcharObjectInspector =>
64+
(o: Any) => new HiveVarchar(o.asInstanceOf[String], o.asInstanceOf[String].size)
65+
66+
case _: JavaHiveDecimalObjectInspector =>
67+
(o: Any) => HiveShim.createDecimal(o.asInstanceOf[BigDecimal].underlying())
68+
69+
case soi: StandardStructObjectInspector =>
70+
val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector))
71+
(o: Any) => {
72+
val struct = soi.create()
73+
(soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[Row].toSeq).zipped.foreach {
74+
(field, wrapper, data) => soi.setStructFieldData(struct, field, wrapper(data))
75+
}
76+
struct
77+
}
78+
79+
case loi: ListObjectInspector =>
80+
val wrapper = wrapperFor(loi.getListElementObjectInspector)
81+
(o: Any) => seqAsJavaList(o.asInstanceOf[Seq[_]].map(wrapper))
82+
83+
case moi: MapObjectInspector =>
84+
val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector)
85+
val valueWrapper = wrapperFor(moi.getMapValueObjectInspector)
86+
(o: Any) => mapAsJavaMap(o.asInstanceOf[Map[_, _]].map { case (key, value) =>
87+
keyWrapper(key) -> valueWrapper(value)
88+
})
89+
90+
case _ =>
91+
identity[Any]
92+
}
93+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.orc
19+
20+
import java.io.IOException
21+
22+
import org.apache.hadoop.conf.Configuration
23+
import org.apache.hadoop.fs.{FileSystem, Path}
24+
import org.apache.hadoop.hive.ql.io.orc.{OrcFile, Reader}
25+
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
26+
import org.apache.spark.Logging
27+
import org.apache.spark.deploy.SparkHadoopUtil
28+
import org.apache.spark.sql.hive.HiveMetastoreTypes
29+
import org.apache.spark.sql.types.StructType
30+
31+
private[orc] object OrcFileOperator extends Logging{
32+
33+
def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = {
34+
var conf = config.getOrElse(new Configuration)
35+
val fspath = new Path(pathStr)
36+
val fs = fspath.getFileSystem(conf)
37+
val orcFiles = listOrcFiles(pathStr, conf)
38+
OrcFile.createReader(fs, orcFiles(0))
39+
}
40+
41+
def readSchema(path: String, conf: Option[Configuration]): StructType = {
42+
val reader = getFileReader(path, conf)
43+
val readerInspector: StructObjectInspector = reader.getObjectInspector
44+
.asInstanceOf[StructObjectInspector]
45+
val schema = readerInspector.getTypeName
46+
HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
47+
}
48+
49+
def getObjectInspector(path: String, conf: Option[Configuration]): StructObjectInspector = {
50+
val reader = getFileReader(path, conf)
51+
val readerInspector: StructObjectInspector = reader.getObjectInspector
52+
.asInstanceOf[StructObjectInspector]
53+
readerInspector
54+
}
55+
56+
def deletePath(pathStr: String, conf: Configuration): Unit = {
57+
val fspath = new Path(pathStr)
58+
val fs = fspath.getFileSystem(conf)
59+
try {
60+
fs.delete(fspath, true)
61+
} catch {
62+
case e: IOException =>
63+
throw new IOException(
64+
s"Unable to clear output directory ${fspath.toString} prior"
65+
+ s" to InsertIntoOrcTable:\n${e.toString}")
66+
}
67+
}
68+
69+
def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
70+
val origPath = new Path(pathStr)
71+
val fs = origPath.getFileSystem(conf)
72+
val path = origPath.makeQualified(fs)
73+
val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath)
74+
.filterNot(_.isDir)
75+
.map(_.getPath)
76+
.filterNot(_.getName.startsWith("_"))
77+
.filterNot(_.getName.startsWith("."))
78+
79+
if (paths == null || paths.size == 0) {
80+
throw new IllegalArgumentException(
81+
s"orcFileOperator: path $path does not have valid orc files matching the pattern")
82+
}
83+
logInfo("Qualified file list: ")
84+
paths.foreach{x=>logInfo(x.toString)}
85+
paths
86+
}
87+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.orc
19+
20+
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument
21+
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder
22+
import org.apache.spark.Logging
23+
import org.apache.spark.sql.sources._
24+
25+
private[sql] object OrcFilters extends Logging {
26+
27+
def createFilter(expr: Array[Filter]): Option[SearchArgument] = {
28+
if (expr == null || expr.size == 0) return None
29+
var sarg: Option[Builder] = Some(SearchArgument.FACTORY.newBuilder())
30+
sarg.get.startAnd()
31+
expr.foreach {
32+
x => {
33+
sarg match {
34+
case Some(s1) => sarg = createFilter(x, s1)
35+
case _ => None
36+
}
37+
}
38+
}
39+
sarg match {
40+
case Some(b) => Some(b.end.build)
41+
case _ => None
42+
}
43+
}
44+
45+
def createFilter(expression: Filter, builder: Builder): Option[Builder] = {
46+
expression match {
47+
case p@And(left: Filter, right: Filter) => {
48+
val b1 = builder.startAnd()
49+
val b2 = createFilter(left, b1)
50+
b2 match {
51+
case Some(b) => val b3 = createFilter(right, b)
52+
if (b3.isDefined) {
53+
Some(b3.get.end)
54+
} else {
55+
None
56+
}
57+
case _ => None
58+
}
59+
}
60+
case p@Or(left: Filter, right: Filter) => {
61+
val b1 = builder.startOr()
62+
val b2 = createFilter(left, b1)
63+
b2 match {
64+
case Some(b) => val b3 = createFilter(right, b)
65+
if (b3.isDefined) {
66+
Some(b3.get.end)
67+
} else {
68+
None
69+
}
70+
case _ => None
71+
}
72+
}
73+
case p@Not(child: Filter) => {
74+
val b1 = builder.startNot()
75+
val b2 = createFilter(child, b1)
76+
b2 match {
77+
case Some(b) => Some(b.end)
78+
case _ => None
79+
}
80+
}
81+
case p@EqualTo(attribute: String, value: Any) => {
82+
val b1 = builder.equals(attribute, value)
83+
Some(b1)
84+
}
85+
case p@LessThan(attribute: String, value: Any) => {
86+
val b1 = builder.lessThan(attribute ,value)
87+
Some(b1)
88+
}
89+
case p@LessThanOrEqual(attribute: String, value: Any) => {
90+
val b1 = builder.lessThanEquals(attribute, value)
91+
Some(b1)
92+
}
93+
case p@GreaterThan(attribute: String, value: Any) => {
94+
val b1 = builder.startNot().lessThanEquals(attribute, value).end()
95+
Some(b1)
96+
}
97+
case p@GreaterThanOrEqual(attribute: String, value: Any) => {
98+
val b1 = builder.startNot().lessThan(attribute, value).end()
99+
Some(b1)
100+
}
101+
case p@IsNull(attribute: String) => {
102+
val b1 = builder.startNot().isNull(attribute).end()
103+
Some(b1)
104+
}
105+
case p@In(attribute: String, values: Array[Any]) => {
106+
val b1 = builder.in(attribute, values)
107+
Some(b1)
108+
}
109+
// not supported in filter
110+
// case p@EqualNullSafe(left: String, right: String) => {
111+
// val b1 = builder.nullSafeEquals(left, right)
112+
// Some(b1)
113+
// }
114+
case _ => None
115+
}
116+
}
117+
}

0 commit comments

Comments
 (0)