diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 368c3d0008b07..233fe1225a43e 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -597,6 +597,96 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or
+## ORC Files
+
+[ORC](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC) is a columnar format that is supported in Hive, it provides a highly efficient way to store data on HDFS to speed up query performance.
+Spark SQL provides support for both reading and writing ORC files that automatically preserves the schema
+of the original data.
+
+### Loading Data Programmatically
+
+Using the data from the above example:
+
+
+
+
+
+{% highlight scala %}
+// Use HiveContext to read or write ORC File.
+val sqlContext = new HiveContext(sc)
+import sqlContext._
+val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
+
+// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using ORC.
+rdd.registerTempTable("people")
+rdd.saveAsOrcFile("people.orc")
+
+// Read in the ORC file created above. ORC files are self-describing so the schema is preserved.
+// The result of loading a ORC file is also a SchemaRDD.
+val orcFile = hiveContext.orcFile("pair.orc")
+
+//ORC files can also be registered as tables and then used in SQL statements.
+orcFile.registerTempTable("orcFile")
+val teenagers = sqlContext.sql("SELECT name FROM orcFile WHERE age >= 13 AND age <= 19")
+teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
+{% endhighlight %}
+
+
+
+
+
+{% highlight java %}
+// Use JavaHiveContext to read or write ORC File.
+JavaHiveContext sqlContext = new org.apache.spark.sql.hive.api.java.HiveContext(sc);
+JavaSchemaRDD schemaPeople = ... // The JavaSchemaRDD from the previous example.
+
+// JavaSchemaRDDs can be saved as ORC files, maintaining the schema information.
+schemaPeople.saveAsOrcFile("people.orc");
+
+// Read in the ORC file created above. ORC files are self-describing so the schema is preserved.
+// The result of loading a ORC file is also a JavaSchemaRDD.
+JavaSchemaRDD orcFile = sqlContext.orcFile("people.orc");
+
+// ORC files can also be registered as tables and then used in SQL statements.
+orcFile.registerTempTable("orcFile");
+JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM orcFile WHERE age >= 13 AND age <= 19");
+List teenagerNames = teenagers.map(new Function() {
+ public String call(Row row) {
+ return "Name: " + row.getString(0);
+ }
+}).collect();
+{% endhighlight %}
+
+
+
+
+
+{% highlight python %}
+# Use HiveContext to read or write ORC File.
+from pyspark.sql import HiveContext
+sqlContext = HiveContext(sc)
+
+schemaPeople # The SchemaRDD from the previous example.
+
+# SchemaRDDs can be saved as ORC files, maintaining the schema information.
+schemaPeople.saveAsOrcFile("people.orc")
+
+# Read in the ORC file created above. ORC files are self-describing so the schema is preserved.
+# The result of loading a ORC file is also a SchemaRDD.
+orcFile = sqlContext.orcFile("people.orc")
+
+# ORC files can also be registered as tables and then used in SQL statements.
+orcFile.registerTempTable("orcFile");
+teenagers = sqlContext.sql("SELECT name FROM orcFile WHERE age >= 13 AND age <= 19")
+teenNames = teenagers.map(lambda p: "Name: " + p.name)
+for teenName in teenNames.collect():
+ print teenName
+{% endhighlight %}
+
+
+
+
+
## JSON Datasets
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 75b6e37c2a1f9..305dc9b3f21fb 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -271,7 +271,7 @@ package object dsl {
object plans { // scalastyle:ignore
implicit class DslLogicalPlan(val logicalPlan: LogicalPlan) extends LogicalPlanFunctions {
- def writeToFile(path: String) = WriteToFile(path, logicalPlan)
+ def writeToFile(path: String) = WriteToPaquetFile(path, logicalPlan)
}
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 14b03c7445c13..1c28fc8fe4797 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -122,7 +122,7 @@ case class CreateTableAsSelect(
override lazy val resolved = (databaseName != None && childrenResolved)
}
-case class WriteToFile(
+case class WriteToPaquetFile(
path: String,
child: LogicalPlan) extends UnaryNode {
override def output = child.output
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
index 25ba7d88ba538..5b42011e35743 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
@@ -54,7 +54,7 @@ private[sql] trait SchemaRDDLike {
@transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match {
// For various commands (like DDL) and queries with side effects, we force query optimization to
// happen right away to let these side effects take place eagerly.
- case _: Command | _: InsertIntoTable | _: CreateTableAsSelect |_: WriteToFile =>
+ case _: Command | _: InsertIntoTable | _: CreateTableAsSelect |_: WriteToPaquetFile =>
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
case _ =>
baseLogicalPlan
@@ -73,7 +73,7 @@ private[sql] trait SchemaRDDLike {
* @group schema
*/
def saveAsParquetFile(path: String): Unit = {
- sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
+ sqlContext.executePlan(WriteToPaquetFile(path, logicalPlan)).toRdd
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 79e4ddb8c4f5d..ee70032f0f7b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -198,8 +198,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object ParquetOperations extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- // TODO: need to support writing to other types of files. Unify the below code paths.
- case logical.WriteToFile(path, child) =>
+ case logical.WriteToPaquetFile(path, child) =>
val relation =
ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, sqlContext)
// Note: overwrite=false because otherwise the metadata we just created will be deleted
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 5c6fa78ae3895..e52a5eea7a44b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -283,7 +283,8 @@ case class InsertIntoParquetTable(
1
} else {
FileSystemHelper
- .findMaxTaskId(NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1
+ .findMaxTaskId(
+ NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration, "parquet") + 1
}
def writeShard(context: TaskContext, iter: Iterator[Row]): Int = {
@@ -496,7 +497,7 @@ private[parquet] object FilteringParquetRowInputFormat {
.build[FileStatus, Array[BlockLocation]]()
}
-private[parquet] object FileSystemHelper {
+private[sql] object FileSystemHelper {
def listFiles(pathStr: String, conf: Configuration): Seq[Path] = {
val origPath = new Path(pathStr)
val fs = origPath.getFileSystem(conf)
@@ -512,19 +513,37 @@ private[parquet] object FileSystemHelper {
fs.listStatus(path).map(_.getPath)
}
- /**
- * Finds the maximum taskid in the output file names at the given path.
- */
- def findMaxTaskId(pathStr: String, conf: Configuration): Int = {
+ /**
+ * List files with special extension
+ */
+ def listFiles(origPath: Path, conf: Configuration, extension: String): Seq[Path] = {
+ val fs = origPath.getFileSystem(conf)
+ if (fs == null) {
+ throw new IllegalArgumentException(
+ s"Path $origPath is incorrectly formatted")
+ }
+ val path = origPath.makeQualified(fs)
+ if (fs.exists(path)) {
+ fs.listStatus(path).map(_.getPath).filter(p => p.getName.endsWith(extension))
+ } else {
+ Seq.empty
+ }
+ }
+
+ /**
+ * Finds the maximum taskid in the output file names at the given path.
+ */
+ def findMaxTaskId(pathStr: String, conf: Configuration, extension: String): Int = {
+ // filename pattern is part-r-
.$extension
+ require(Seq("orc", "parquet").contains(extension), s"Unsupported extension: $extension")
+ val nameP = new scala.util.matching.Regex(s"""part-r-(\\d{1,}).$extension""", "taskid")
val files = FileSystemHelper.listFiles(pathStr, conf)
- // filename pattern is part-r-.parquet
- val nameP = new scala.util.matching.Regex("""part-r-(\d{1,}).parquet""", "taskid")
val hiddenFileP = new scala.util.matching.Regex("_.*")
files.map(_.getName).map {
case nameP(taskid) => taskid.toInt
case hiddenFileP() => 0
case other: String => {
- sys.error("ERROR: attempting to append to set of Parquet files and found file" +
+ sys.error(s"ERROR: attempting to append to set of $extension files and found file" +
s"that does not match name pattern: $other")
0
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 8b5a90159e1bb..8d6530d308456 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -24,6 +24,7 @@ import java.util.{ArrayList => JArrayList}
import scala.collection.JavaConversions._
import scala.language.implicitConversions
import scala.reflect.runtime.universe.{TypeTag, typeTag}
+import scala.Some
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
@@ -38,14 +39,16 @@ import org.apache.hadoop.hive.serde2.io.DateWritable
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.hive.orc.OrcSchemaRDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators}
import org.apache.spark.sql.catalyst.analysis.{OverrideCatalog, OverrideFunctionRegistry}
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.ExtractPythonUdfs
-import org.apache.spark.sql.execution.QueryExecutionException
+import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.{Command => PhysicalCommand}
+import org.apache.spark.sql.catalyst.plans.logical.SetCommand
+import org.apache.spark.sql.catalyst.plans.logical.NativeCommand
import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand
/**
@@ -110,6 +113,17 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
s"The SQL dialect for parsing can be set using ${SQLConf.DIALECT}", "1.1")
def hql(hqlQuery: String): SchemaRDD = hiveql(hqlQuery)
+ /**
+ * Creates a SchemaRDD from an RDD of case classes.
+ *
+ * @group userf
+ */
+ implicit override def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) = {
+ SparkPlan.currentContext.set(self)
+ new OrcSchemaRDD(this,
+ LogicalRDD(ScalaReflection.attributesFor[A], RDDConversions.productToRowRdd(rdd))(self))
+ }
+
/**
* Creates a table using the schema of the given class.
*
@@ -121,6 +135,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting)
}
+ /**
+ * Loads a ORC file, returning the result as a [[SchemaRDD]].
+ *
+ * @group userf
+ */
+ def orcFile(path: String): SchemaRDD = new SchemaRDD(
+ this, orc.OrcRelation(Seq.empty, path, Some(sparkContext.hadoopConfiguration), this))
+
/**
* Analyzes the given table in the current database to generate statistics, which will be
* used in query optimizations.
@@ -334,6 +356,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
HiveCommandStrategy(self),
TakeOrdered,
ParquetOperations,
+ OrcOperations,
InMemoryScans,
ParquetConversion, // Must be before HiveTableScans
HiveTableScans,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 5c66322f1ed99..fdcd8aa58c922 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -22,12 +22,13 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.planning._
-import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.types.StringType
import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan}
import org.apache.spark.sql.hive
import org.apache.spark.sql.hive.execution._
+import org.apache.spark.sql.hive.orc.{WriteToOrcFile, InsertIntoOrcTable, OrcRelation, OrcTableScan}
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.{SQLContext, SchemaRDD}
@@ -221,4 +222,24 @@ private[hive] trait HiveStrategies {
case _ => Nil
}
}
+
+ object OrcOperations extends Strategy {
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case WriteToOrcFile(path, child) =>
+ val relation =
+ OrcRelation.create(path, child, sparkContext.hadoopConfiguration, sqlContext)
+ InsertIntoOrcTable(relation, planLater(child), overwrite = true) :: Nil
+ case logical.InsertIntoTable(table: OrcRelation, partition, child, overwrite) =>
+ InsertIntoOrcTable(table, planLater(child), overwrite) :: Nil
+ case PhysicalOperation(projectList, filters, relation: OrcRelation) =>
+ // TODO: need to implement predict push down.
+ val prunePushedDownFilters = identity[Seq[Expression]] _
+ pruneFilterProject(
+ projectList,
+ filters,
+ prunePushedDownFilters,
+ OrcTableScan(_, relation, None)) :: Nil
+ case _ => Nil
+ }
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
new file mode 100644
index 0000000000000..1c453f9ecd2bd
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.orc
+
+import java.util.Properties
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.permission.FsAction
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
+import org.apache.hadoop.hive.ql.io.orc._
+
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedException, MultiInstanceRelation}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.hive.HiveMetastoreTypes
+import org.apache.spark.sql.parquet.FileSystemHelper
+import org.apache.spark.sql.SQLContext
+
+import scala.collection.JavaConversions._
+
+private[sql] case class OrcRelation(
+ attributes: Seq[Attribute],
+ path: String,
+ @transient conf: Option[Configuration],
+ @transient sqlContext: SQLContext,
+ partitioningAttributes: Seq[Attribute] = Nil)
+ extends LeafNode with MultiInstanceRelation {
+ self: Product =>
+
+ val prop: Properties = new Properties
+
+ override lazy val output = attributes ++ OrcFileOperator.orcSchema(path, conf, prop)
+
+ // TODO: use statistics in ORC file
+ override lazy val statistics = Statistics(sizeInBytes = sqlContext.defaultSizeInBytes)
+
+ override def newInstance() =
+ OrcRelation(attributes, path, conf, sqlContext).asInstanceOf[this.type]
+}
+
+private[sql] object OrcRelation {
+ /**
+ * Creates a new OrcRelation and underlying Orcfile for the given LogicalPlan. Note that
+ * this is used inside [[org.apache.spark.sql.execution.SparkStrategies]] to
+ * create a resolved relation as a data sink for writing to a Orcfile.
+ *
+ * @param pathString The directory the ORCfile will be stored in.
+ * @param child The child node that will be used for extracting the schema.
+ * @param conf A configuration to be used.
+ * @return An empty OrcRelation with inferred metadata.
+ */
+ def create(
+ pathString: String,
+ child: LogicalPlan,
+ conf: Configuration,
+ sqlContext: SQLContext): OrcRelation = {
+ if (!child.resolved) {
+ throw new UnresolvedException[LogicalPlan](
+ child,
+ "Attempt to create Orc table from unresolved child")
+ }
+ val path = checkPath(pathString, false, conf)
+ new OrcRelation(child.output, path.toString, Some(conf), sqlContext)
+ }
+
+ private def checkPath(pathStr: String, allowExisting: Boolean, conf: Configuration): Path = {
+ require(pathStr != null, "Unable to create OrcRelation: path is null")
+ val origPath = new Path(pathStr)
+ val fs = origPath.getFileSystem(conf)
+ require(fs != null, s"Unable to create OrcRelation: incorrectly formatted path $pathStr")
+ val path = origPath.makeQualified(fs)
+ if (!allowExisting) {
+ require(!fs.exists(path), s"File $pathStr already exists.")
+ }
+ if (fs.exists(path)) {
+ require(fs.getFileStatus(path).getPermission.getUserAction.implies(FsAction.READ_WRITE),
+ s"Unable to create OrcRelation: path $path not read-writable")
+ }
+ path
+ }
+}
+
+private[sql] object OrcFileOperator{
+ def getMetaDataReader(origPath: Path, configuration: Option[Configuration]): Reader = {
+ val conf = configuration.getOrElse(new Configuration())
+ val fs: FileSystem = origPath.getFileSystem(conf)
+ val orcFiles = FileSystemHelper.listFiles(origPath, conf, ".orc")
+ if (orcFiles == Seq.empty) {
+ // should return null when write to orc file
+ return null
+ }
+ OrcFile.createReader(fs, orcFiles(0))
+ }
+
+ def orcSchema(
+ path: String,
+ conf: Option[Configuration],
+ prop: Properties): Seq[Attribute] = {
+ // get the schema info through ORC Reader
+ val origPath = new Path(path)
+ val reader = getMetaDataReader(origPath, conf)
+ if (reader == null) {
+ // return empty seq when saveAsOrcFile
+ return Seq.empty
+ }
+ val inspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
+ // data types that is inspected by this inspector
+ val schema = inspector.getTypeName
+ // set prop here, initial OrcSerde need it
+ val fields = inspector.getAllStructFieldRefs
+ val (columns, columnTypes) = fields.map { f =>
+ f.getFieldName -> f.getFieldObjectInspector.getTypeName
+ }.unzip
+ prop.setProperty("columns", columns.mkString(","))
+ prop.setProperty("columns.types", columnTypes.mkString(":"))
+
+ HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType].toAttributes
+ }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala
new file mode 100644
index 0000000000000..82aeb12a55a8b
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.hive.orc
+
+import java.io.IOException
+import java.text.SimpleDateFormat
+import java.util.{Locale, Date}
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, FileOutputCommitter}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.io.{Writable, NullWritable}
+import org.apache.hadoop.mapreduce.{TaskID, TaskAttemptContext, Job}
+import org.apache.hadoop.mapred.{SparkHadoopMapRedUtil, Reporter, JobConf}
+import org.apache.hadoop.hive.ql.io.orc.{OrcSerde, OrcInputFormat, OrcOutputFormat}
+import org.apache.hadoop.hive.serde2.objectinspector._
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils
+import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfoUtils, TypeInfo}
+
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.parquet.FileSystemHelper
+import org.apache.spark.{TaskContext, SerializableWritable}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode => LogicalUnaryNode}
+import org.apache.spark.sql.catalyst.types.StructType
+import org.apache.spark.sql.execution.UnaryNode
+import org.apache.spark.sql.hive.{HiveMetastoreTypes, HadoopTableReader}
+
+import scala.collection.JavaConversions._
+
+/**
+ * logical plan of writing to ORC file
+ */
+case class WriteToOrcFile(
+ path: String,
+ child: LogicalPlan) extends LogicalUnaryNode {
+ def output = child.output
+}
+
+/**
+ * orc table scan operator. Imports the file that backs the given
+ * [[org.apache.spark.sql.hive.orc.OrcRelation]] as a ``RDD[Row]``.
+ */
+case class OrcTableScan(
+ output: Seq[Attribute],
+ relation: OrcRelation,
+ columnPruningPred: Option[Expression])
+ extends LeafNode {
+
+ @transient
+ lazy val serde: OrcSerde = initSerde
+
+ private def initSerde(): OrcSerde = {
+ val serde = new OrcSerde
+ serde.initialize(null, relation.prop)
+ serde
+ }
+
+ override def execute(): RDD[Row] = {
+ val sc = sqlContext.sparkContext
+ val job = new Job(sc.hadoopConfiguration)
+
+ val conf: Configuration = job.getConfiguration
+ relation.path.split(",").foreach { curPath =>
+ val qualifiedPath = {
+ val path = new Path(curPath)
+ path.getFileSystem(conf).makeQualified(path)
+ }
+ FileInputFormat.addInputPath(job, qualifiedPath)
+ }
+
+ addColumnIds(output, relation, conf)
+ val inputClass = classOf[OrcInputFormat].asInstanceOf[
+ Class[_ <: org.apache.hadoop.mapred.InputFormat[NullWritable, Writable]]]
+
+ // use SpecificMutableRow to decrease GC garbage
+ val mutableRow = new SpecificMutableRow(output.map(_.dataType))
+ val attrsWithIndex = output.zipWithIndex
+ val rowRdd = sc.hadoopRDD[NullWritable, Writable](conf.asInstanceOf[JobConf], inputClass,
+ classOf[NullWritable], classOf[Writable]).map(_._2).mapPartitions { iter =>
+ val deserializer = serde
+ HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow)
+ }
+ rowRdd
+ }
+
+ /**
+ * add column ids and names
+ * @param output
+ * @param relation
+ * @param conf
+ */
+ def addColumnIds(output: Seq[Attribute], relation: OrcRelation, conf: Configuration) {
+ val fieldIdMap = relation.output.map(_.name).zipWithIndex.toMap
+
+ val ids = output.map(att => {
+ val realName = att.name.toLowerCase(Locale.ENGLISH)
+ fieldIdMap.getOrElse(realName, -1)
+ }).filter(_ >= 0).map(_.asInstanceOf[Integer])
+ if (ids != null && !ids.isEmpty) {
+ ColumnProjectionUtils.appendReadColumnIDs(conf, ids)
+ }
+
+ val names = output.map(_.name)
+ if (names != null && !names.isEmpty) {
+ ColumnProjectionUtils.appendReadColumnNames(conf, names)
+ }
+ }
+
+ /**
+ * Applies a (candidate) projection.
+ *
+ * @param prunedAttributes The list of attributes to be used in the projection.
+ * @return Pruned TableScan.
+ */
+ def pruneColumns(prunedAttributes: Seq[Attribute]): OrcTableScan = {
+ // TODO: prune projection
+ OrcTableScan(prunedAttributes, relation, columnPruningPred)
+ }
+}
+
+/**
+ * Operator that acts as a sink for queries on RDDs and can be used to
+ * store the output inside a directory of ORC files. This operator
+ * is similar to Hive's INSERT INTO TABLE operation in the sense that
+ * one can choose to either overwrite or append to a directory. Note
+ * that consecutive insertions to the same table must have compatible
+ * (source) schemas.
+ */
+private[sql] case class InsertIntoOrcTable(
+ relation: OrcRelation,
+ child: SparkPlan,
+ overwrite: Boolean = false)
+ extends UnaryNode with SparkHadoopMapRedUtil with org.apache.spark.Logging {
+
+ override def output = child.output
+
+ @transient val sc = sqlContext.sparkContext
+
+ @transient lazy val orcSerde = initSerde
+
+ private def initSerde(): OrcSerde = {
+ val serde: OrcSerde = new OrcSerde
+ serde.initialize(null, relation.prop)
+ serde
+ }
+
+ /**
+ * Inserts all rows into the Orc file.
+ */
+ override def execute() = {
+ val childRdd = child.execute()
+ assert(childRdd != null)
+
+ val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
+ val conf = job.getConfiguration
+
+ val fspath = new Path(relation.path)
+ val fs = fspath.getFileSystem(conf)
+
+ if (overwrite) {
+ try {
+ fs.delete(fspath, true)
+ } catch {
+ case e: IOException =>
+ throw new IOException(
+ s"Unable to clear output directory ${fspath.toString} prior"
+ + s" to InsertIntoOrcTable:\n${e.toString}")
+ }
+ }
+ val structType = StructType.fromAttributes(relation.output)
+ // get Type String to build typeInfo
+ val orcSchema = HiveMetastoreTypes.toMetastoreType(structType)
+
+ val writableRdd = childRdd.mapPartitions { iter =>
+ val typeInfo: TypeInfo =
+ TypeInfoUtils.getTypeInfoFromTypeString(orcSchema)
+ val standardOI = TypeInfoUtils
+ .getStandardJavaObjectInspectorFromTypeInfo(typeInfo)
+ .asInstanceOf[StructObjectInspector]
+ val fieldOIs = standardOI
+ .getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
+ val outputData = new Array[Any](fieldOIs.length)
+ iter.map { row =>
+ var i = 0
+ while (i < row.length) {
+ outputData(i) = HadoopTypeConverter.wrap((row(i), fieldOIs(i)))
+ i += 1
+ }
+ orcSerde.serialize(outputData, standardOI)
+ }
+ }
+
+ saveAsHadoopFile(writableRdd, relation.path, conf)
+
+ // We return the child RDD to allow chaining (alternatively, one could return nothing).
+ childRdd
+ }
+
+ private def saveAsHadoopFile(
+ rdd: RDD[Writable],
+ path: String,
+ @transient conf: Configuration) {
+ val job = new Job(conf)
+ val keyType = classOf[Void]
+ job.setOutputKeyClass(keyType)
+ job.setOutputValueClass(classOf[Writable])
+ FileOutputFormat.setOutputPath(job, new Path(path))
+
+ val wrappedConf = new SerializableWritable(job.getConfiguration)
+
+ val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+ val jobtrackerID = formatter.format(new Date())
+ val stageId = sqlContext.sparkContext.newRddId()
+
+ val taskIdOffset =
+ if (overwrite) {
+ 1
+ } else {
+ FileSystemHelper
+ .findMaxTaskId(
+ FileOutputFormat.getOutputPath(job).toString, job.getConfiguration, "orc") + 1
+ }
+
+ def getWriter(
+ outFormat: OrcOutputFormat,
+ conf: Configuration,
+ path: Path,
+ reporter: Reporter) = {
+ val fs = path.getFileSystem(conf)
+
+ outFormat.getRecordWriter(fs, conf.asInstanceOf[JobConf], path.toUri.getPath, reporter).
+ asInstanceOf[org.apache.hadoop.mapred.RecordWriter[NullWritable, Writable]]
+ }
+
+ def getCommitterAndWriter(offset: Int, context: TaskAttemptContext) = {
+ val outFormat = new OrcOutputFormat
+
+ val taskId: TaskID = context.getTaskAttemptID.getTaskID
+ val partition: Int = taskId.getId
+ val filename = s"part-r-${partition + offset}.orc"
+ val output: Path = FileOutputFormat.getOutputPath(context)
+ val committer = new FileOutputCommitter(output, context)
+ val path = new Path(committer.getWorkPath, filename)
+ val writer = getWriter(outFormat, wrappedConf.value, path, Reporter.NULL)
+ (committer, writer)
+ }
+
+ def writeShard(context: TaskContext, iter: Iterator[Writable]): Int = {
+ // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
+ // around by taking a mod. We expect that no task will be attempted 2 billion times.
+ val attemptNumber = (context.attemptId % Int.MaxValue).toInt
+ /* "reduce task" */
+ val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
+ attemptNumber)
+ val hadoopContext = newTaskAttemptContext(wrappedConf.value.asInstanceOf[JobConf], attemptId)
+ val workerAndComitter = getCommitterAndWriter(taskIdOffset, hadoopContext)
+ val writer = workerAndComitter._2
+
+ while (iter.hasNext) {
+ val row = iter.next()
+ writer.write(NullWritable.get(), row)
+ }
+
+ writer.close(Reporter.NULL)
+ workerAndComitter._1.commitTask(hadoopContext)
+ return 1
+ }
+
+ /* apparently we need a TaskAttemptID to construct an OutputCommitter;
+ * however we're only going to use this local OutputCommitter for
+ * setupJob/commitJob, so we just use a dummy "map" task.
+ */
+ val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
+ val jobTaskContext = newTaskAttemptContext(
+ wrappedConf.value.asInstanceOf[JobConf], jobAttemptId)
+ val workerAndComitter = getCommitterAndWriter(taskIdOffset, jobTaskContext)
+ workerAndComitter._1.setupJob(jobTaskContext)
+ sc.runJob(rdd, writeShard _)
+ workerAndComitter._1.commitJob(jobTaskContext)
+ }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala
new file mode 100644
index 0000000000000..14fe51b92d4b1
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.{SQLContext, SchemaRDD}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.hadoop.hive.serde2.objectinspector._
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.{JavaHiveDecimalObjectInspector, JavaHiveVarcharObjectInspector}
+import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
+import org.apache.spark.sql.catalyst.expressions.Row
+/* Implicit conversions */
+import scala.collection.JavaConversions._
+package object orc {
+ class OrcSchemaRDD(
+ @transient val sqlContext1: SQLContext,
+ @transient val baseLogicalPlan1: LogicalPlan)
+ extends SchemaRDD(sqlContext1, baseLogicalPlan1) {
+ /**
+ * Saves the contents of this `SchemaRDD` as a ORC file, preserving the schema. Files that
+ * are written out using this method can be read back in as a SchemaRDD using the `orcFile`
+ * function.
+ * Note: you can only use it in HiveContext
+ *
+ * @group schema
+ */
+ def saveAsOrcFile(path: String): Unit = {
+ sqlContext.executePlan(WriteToOrcFile(path, logicalPlan)).toRdd
+ }
+ }
+
+ // TypeConverter for InsertIntoOrcTable
+ object HadoopTypeConverter extends HiveInspectors {
+ def wrap(a: (Any, ObjectInspector)): Any = a match {
+ case (s: String, oi: JavaHiveVarcharObjectInspector) =>
+ new HiveVarchar(s, s.size)
+
+ case (bd: BigDecimal, oi: JavaHiveDecimalObjectInspector) =>
+ new HiveDecimal(bd.underlying())
+
+ case (row: Row, oi: StandardStructObjectInspector) =>
+ val struct = oi.create()
+ row.zip(oi.getAllStructFieldRefs: Seq[StructField]).foreach {
+ case (data, field) =>
+ oi.setStructFieldData(struct, field, wrap(data, field.getFieldObjectInspector))
+ }
+ struct
+ case (s: Seq[_], oi: ListObjectInspector) =>
+ val wrappedSeq = s.map(wrap(_, oi.getListElementObjectInspector))
+ seqAsJavaList(wrappedSeq)
+
+ case (m: Map[_, _], oi: MapObjectInspector) =>
+ val keyOi = oi.getMapKeyObjectInspector
+ val valueOi = oi.getMapValueObjectInspector
+ val wrappedMap = m.map { case (key, value) => wrap(key, keyOi) -> wrap(value, valueOi) }
+ mapAsJavaMap(wrappedMap)
+
+ case (obj, _) =>
+ obj
+ }
+ }
+
+ // for orc compression type, only take effect in hive 0.13.1
+ val orcDefaultCompressVar = "hive.exec.orc.default.compress"
+ // for prediction push down in hive-0.13.1, don't enable it
+ val ORC_FILTER_PUSHDOWN_ENABLED = false
+ val SARG_PUSHDOWN = "sarg.pushdown"
+}
diff --git a/sql/hive/src/test/resources/data/files/orcfiles/.part-r-1.orc.crc b/sql/hive/src/test/resources/data/files/orcfiles/.part-r-1.orc.crc
new file mode 100644
index 0000000000000..048f7346c2c05
Binary files /dev/null and b/sql/hive/src/test/resources/data/files/orcfiles/.part-r-1.orc.crc differ
diff --git a/sql/hive/src/test/resources/data/files/orcfiles/.part-r-2.orc.crc b/sql/hive/src/test/resources/data/files/orcfiles/.part-r-2.orc.crc
new file mode 100644
index 0000000000000..52810863f2ae9
Binary files /dev/null and b/sql/hive/src/test/resources/data/files/orcfiles/.part-r-2.orc.crc differ
diff --git a/sql/hive/src/test/resources/data/files/orcfiles/_SUCCESS b/sql/hive/src/test/resources/data/files/orcfiles/_SUCCESS
new file mode 100755
index 0000000000000..e69de29bb2d1d
diff --git a/sql/hive/src/test/resources/data/files/orcfiles/part-r-1.orc b/sql/hive/src/test/resources/data/files/orcfiles/part-r-1.orc
new file mode 100755
index 0000000000000..3188e14c1fa48
Binary files /dev/null and b/sql/hive/src/test/resources/data/files/orcfiles/part-r-1.orc differ
diff --git a/sql/hive/src/test/resources/data/files/orcfiles/part-r-2.orc b/sql/hive/src/test/resources/data/files/orcfiles/part-r-2.orc
new file mode 100755
index 0000000000000..525488226e2a9
Binary files /dev/null and b/sql/hive/src/test/resources/data/files/orcfiles/part-r-2.orc differ
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/ORCQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/ORCQuerySuite.scala
new file mode 100644
index 0000000000000..791d7c01066de
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/ORCQuerySuite.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.orc
+
+import java.util.Properties
+import java.io.File
+import org.scalatest.BeforeAndAfterAll
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.catalyst.util.getTempFilePath
+import org.apache.spark.sql.hive.test.TestHive._
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.util.Utils
+import org.apache.hadoop.hive.ql.io.orc.CompressionKind
+
+case class TestRDDEntry(key: Int, value: String)
+
+case class NullReflectData(
+ intField: java.lang.Integer,
+ longField: java.lang.Long,
+ floatField: java.lang.Float,
+ doubleField: java.lang.Double,
+ booleanField: java.lang.Boolean)
+
+case class OptionalReflectData(
+ intField: Option[Int],
+ longField: Option[Long],
+ floatField: Option[Float],
+ doubleField: Option[Double],
+ booleanField: Option[Boolean])
+
+case class Nested(i: Int, s: String)
+
+case class Data(array: Seq[Int], nested: Nested)
+
+case class Contact(name: String, phone: String)
+
+case class Person(name: String, age: Int, contacts: Seq[Contact])
+
+case class AllDataTypes(
+ stringField: String,
+ intField: Int,
+ longField: Long,
+ floatField: Float,
+ doubleField: Double,
+ shortField: Short,
+ byteField: Byte,
+ booleanField: Boolean)
+
+case class AllDataTypesWithNonPrimitiveType(
+ stringField: String,
+ intField: Int,
+ longField: Long,
+ floatField: Float,
+ doubleField: Double,
+ shortField: Short,
+ byteField: Byte,
+ booleanField: Boolean,
+ array: Seq[Int],
+ arrayContainsNull: Seq[Option[Int]],
+ map: Map[Int, Long],
+ mapValueContainsNull: Map[Int, Option[Long]],
+ data: Data)
+
+case class BinaryData(binaryData: Array[Byte])
+
+class OrcQuerySuite extends QueryTest with BeforeAndAfterAll {
+
+ test("Read/Write All Types") {
+ val tempDir = getTempFilePath("orcTest").getCanonicalPath
+ val range = (0 to 255)
+ val data = sparkContext
+ .parallelize(range)
+ .map(x => AllDataTypes(
+ s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0))
+
+ data.saveAsOrcFile(tempDir)
+ checkAnswer(
+ TestHive.orcFile(tempDir),
+ data.toSchemaRDD.collect().toSeq)
+
+ Utils.deleteRecursively(new File(tempDir))
+ }
+
+ test("read/write binary data") {
+ val tempDir = getTempFilePath("orcTest").getCanonicalPath
+ val range = (0 to 3)
+ sparkContext.parallelize(range)
+ .map(x => BinaryData(s"test$x".getBytes("utf8"))).saveAsOrcFile(tempDir)
+
+ TestHive.orcFile(tempDir)
+ .map(r => new String(r(0).asInstanceOf[Array[Byte]], "utf8"))
+ .collect().toSet == Set("test0", "test1", "test2", "test3")
+ Utils.deleteRecursively(new File(tempDir))
+ }
+
+ test("Read/Write All Types with non-primitive type") {
+ val tempDir = getTempFilePath("orcTest").getCanonicalPath
+ val range = (0 to 255)
+ val data = sparkContext.parallelize(range)
+ .map(x => AllDataTypesWithNonPrimitiveType(
+ s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0,
+ (0 until x),
+ (0 until x).map(Option(_).filter(_ % 3 == 0)),
+ (0 until x).map(i => i -> i.toLong).toMap,
+ (0 until x).map(i => i -> Option(i.toLong)).toMap + (x -> None),
+ Data((0 until x), Nested(x, s"$x"))))
+ data.saveAsOrcFile(tempDir)
+
+ checkAnswer(
+ TestHive.orcFile(tempDir),
+ data.toSchemaRDD.collect().toSeq)
+ Utils.deleteRecursively(new File(tempDir))
+ }
+
+ test("Creating case class RDD table") {
+ sparkContext.parallelize((1 to 100))
+ .map(i => TestRDDEntry(i, s"val_$i"))
+ .registerTempTable("tmp")
+ val rdd = sql("SELECT * FROM tmp").collect().sortBy(_.getInt(0))
+ var counter = 1
+ rdd.foreach {
+ // '===' does not like string comparison?
+ row => {
+ assert(row.getString(1).equals(s"val_$counter"), s"row $counter value ${row.getString(1)} does not match val_$counter")
+ counter = counter + 1
+ }
+ }
+ }
+
+ test("Simple selection form orc table") {
+ val tempDir = getTempFilePath("orcTest").getCanonicalPath
+ val data = sparkContext.parallelize((1 to 10))
+ .map(i => Person(s"name_$i", i, (0 until 2).map{ m=>
+ Contact(s"contact_$m", s"phone_$m") }))
+ data.saveAsOrcFile(tempDir)
+ val f = TestHive.orcFile(tempDir)
+ f.registerTempTable("tmp")
+ var rdd = sql("SELECT name FROM tmp where age <= 5")
+ assert(rdd.count() == 5)
+
+ rdd = sql("SELECT name, contacts FROM tmp where age > 5")
+ assert(rdd.count() == 5)
+
+ val contacts = rdd.flatMap(t=>t(1).asInstanceOf[Seq[_]])
+ assert(contacts.count() == 10)
+ Utils.deleteRecursively(new File(tempDir))
+ }
+
+ test("save and load case class RDD with Nones as orc") {
+ val data = OptionalReflectData(None, None, None, None, None)
+ val rdd = sparkContext.parallelize(data :: data :: data :: Nil)
+ val tempDir = getTempFilePath("orcTest").getCanonicalPath
+ rdd.saveAsOrcFile(tempDir)
+ val readFile = TestHive.orcFile(tempDir)
+ val rdd_saved = readFile.collect()
+ assert(rdd_saved(0) === Seq.fill(5)(null))
+ Utils.deleteRecursively(new File(tempDir))
+ }
+
+ test("Compression options for writing to a Orcfile") {
+ val tempDir = getTempFilePath("orcTest").getCanonicalPath
+ val rdd = TestHive.sparkContext.parallelize((1 to 100))
+ .map(i => TestRDDEntry(i, s"val_$i"))
+
+ // test default compression codec, now only support zlib
+ rdd.saveAsOrcFile(tempDir)
+ val actualCodec = OrcFileOperator.getMetaDataReader(new Path(tempDir), Some(new Configuration())).getCompression.name
+ assert(actualCodec == "ZLIB")
+
+ Utils.deleteRecursively(new File(tempDir))
+ }
+
+ test("Get ORC Schema with ORC Reader") {
+ val path = "src/test/resources/data/files/orcfiles"
+ val attributes = OrcFileOperator.orcSchema(path, Some(TestHive.sparkContext.hadoopConfiguration), new Properties())
+ assert(attributes(0).dataType == StringType)
+ assert(attributes(1).dataType == IntegerType)
+ assert(attributes(2).dataType == LongType)
+ assert(attributes(3).dataType == FloatType)
+ assert(attributes(4).dataType == DoubleType)
+ assert(attributes(5).dataType == ShortType)
+ assert(attributes(6).dataType == ByteType)
+ assert(attributes(7).dataType == BooleanType)
+ }
+
+ ignore("Other Compression options for writing to an Orcfile only supported in hive 0.13.1 and above") {
+ TestHive.sparkContext.hadoopConfiguration.set(orcDefaultCompressVar, "SNAPPY")
+ var tempDir = getTempFilePath("orcTest").getCanonicalPath
+ val rdd = sparkContext.parallelize((1 to 100))
+ .map(i => TestRDDEntry(i, s"val_$i"))
+ rdd.saveAsOrcFile(tempDir)
+ var actualCodec = OrcFileOperator.getMetaDataReader(new Path(tempDir), Some(new Configuration())).getCompression
+ assert(actualCodec == CompressionKind.SNAPPY)
+ Utils.deleteRecursively(new File(tempDir))
+
+ TestHive.sparkContext.hadoopConfiguration.set(orcDefaultCompressVar, "NONE")
+ tempDir = getTempFilePath("orcTest").getCanonicalPath
+ rdd.saveAsOrcFile(tempDir)
+ actualCodec = OrcFileOperator.getMetaDataReader(new Path(tempDir), Some(new Configuration())).getCompression
+ assert(actualCodec == CompressionKind.NONE)
+ Utils.deleteRecursively(new File(tempDir))
+
+ TestHive.sparkContext.hadoopConfiguration.set(orcDefaultCompressVar, "LZO")
+ tempDir = getTempFilePath("orcTest").getCanonicalPath
+ rdd.saveAsOrcFile(tempDir)
+ actualCodec = OrcFileOperator.getMetaDataReader(new Path(tempDir), Some(new Configuration())).getCompression
+ assert(actualCodec == CompressionKind.LZO)
+ Utils.deleteRecursively(new File(tempDir))
+ }
+}