diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 368c3d0008b07..233fe1225a43e 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -597,6 +597,96 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or +## ORC Files + +[ORC](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC) is a columnar format that is supported in Hive, it provides a highly efficient way to store data on HDFS to speed up query performance. +Spark SQL provides support for both reading and writing ORC files that automatically preserves the schema +of the original data. + +### Loading Data Programmatically + +Using the data from the above example: + +
+ +
+ +{% highlight scala %} +// Use HiveContext to read or write ORC File. +val sqlContext = new HiveContext(sc) +import sqlContext._ +val people: RDD[Person] = ... // An RDD of case class objects, from the previous example. + +// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using ORC. +rdd.registerTempTable("people") +rdd.saveAsOrcFile("people.orc") + +// Read in the ORC file created above. ORC files are self-describing so the schema is preserved. +// The result of loading a ORC file is also a SchemaRDD. +val orcFile = hiveContext.orcFile("pair.orc") + +//ORC files can also be registered as tables and then used in SQL statements. +orcFile.registerTempTable("orcFile") +val teenagers = sqlContext.sql("SELECT name FROM orcFile WHERE age >= 13 AND age <= 19") +teenagers.map(t => "Name: " + t(0)).collect().foreach(println) +{% endhighlight %} + +
+ +
+ +{% highlight java %} +// Use JavaHiveContext to read or write ORC File. +JavaHiveContext sqlContext = new org.apache.spark.sql.hive.api.java.HiveContext(sc); +JavaSchemaRDD schemaPeople = ... // The JavaSchemaRDD from the previous example. + +// JavaSchemaRDDs can be saved as ORC files, maintaining the schema information. +schemaPeople.saveAsOrcFile("people.orc"); + +// Read in the ORC file created above. ORC files are self-describing so the schema is preserved. +// The result of loading a ORC file is also a JavaSchemaRDD. +JavaSchemaRDD orcFile = sqlContext.orcFile("people.orc"); + +// ORC files can also be registered as tables and then used in SQL statements. +orcFile.registerTempTable("orcFile"); +JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM orcFile WHERE age >= 13 AND age <= 19"); +List teenagerNames = teenagers.map(new Function() { + public String call(Row row) { + return "Name: " + row.getString(0); + } +}).collect(); +{% endhighlight %} + +
+ +
+ +{% highlight python %} +# Use HiveContext to read or write ORC File. +from pyspark.sql import HiveContext +sqlContext = HiveContext(sc) + +schemaPeople # The SchemaRDD from the previous example. + +# SchemaRDDs can be saved as ORC files, maintaining the schema information. +schemaPeople.saveAsOrcFile("people.orc") + +# Read in the ORC file created above. ORC files are self-describing so the schema is preserved. +# The result of loading a ORC file is also a SchemaRDD. +orcFile = sqlContext.orcFile("people.orc") + +# ORC files can also be registered as tables and then used in SQL statements. +orcFile.registerTempTable("orcFile"); +teenagers = sqlContext.sql("SELECT name FROM orcFile WHERE age >= 13 AND age <= 19") +teenNames = teenagers.map(lambda p: "Name: " + p.name) +for teenName in teenNames.collect(): + print teenName +{% endhighlight %} + +
+ +
+ ## JSON Datasets
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 75b6e37c2a1f9..305dc9b3f21fb 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -271,7 +271,7 @@ package object dsl { object plans { // scalastyle:ignore implicit class DslLogicalPlan(val logicalPlan: LogicalPlan) extends LogicalPlanFunctions { - def writeToFile(path: String) = WriteToFile(path, logicalPlan) + def writeToFile(path: String) = WriteToPaquetFile(path, logicalPlan) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 14b03c7445c13..1c28fc8fe4797 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -122,7 +122,7 @@ case class CreateTableAsSelect( override lazy val resolved = (databaseName != None && childrenResolved) } -case class WriteToFile( +case class WriteToPaquetFile( path: String, child: LogicalPlan) extends UnaryNode { override def output = child.output diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala index 25ba7d88ba538..5b42011e35743 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala @@ -54,7 +54,7 @@ private[sql] trait SchemaRDDLike { @transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match { // For various commands (like DDL) and queries with side effects, we force query optimization to // happen right away to let these side effects take place eagerly. - case _: Command | _: InsertIntoTable | _: CreateTableAsSelect |_: WriteToFile => + case _: Command | _: InsertIntoTable | _: CreateTableAsSelect |_: WriteToPaquetFile => LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext) case _ => baseLogicalPlan @@ -73,7 +73,7 @@ private[sql] trait SchemaRDDLike { * @group schema */ def saveAsParquetFile(path: String): Unit = { - sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd + sqlContext.executePlan(WriteToPaquetFile(path, logicalPlan)).toRdd } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 79e4ddb8c4f5d..ee70032f0f7b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -198,8 +198,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object ParquetOperations extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - // TODO: need to support writing to other types of files. Unify the below code paths. - case logical.WriteToFile(path, child) => + case logical.WriteToPaquetFile(path, child) => val relation = ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, sqlContext) // Note: overwrite=false because otherwise the metadata we just created will be deleted diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 5c6fa78ae3895..e52a5eea7a44b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -283,7 +283,8 @@ case class InsertIntoParquetTable( 1 } else { FileSystemHelper - .findMaxTaskId(NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1 + .findMaxTaskId( + NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration, "parquet") + 1 } def writeShard(context: TaskContext, iter: Iterator[Row]): Int = { @@ -496,7 +497,7 @@ private[parquet] object FilteringParquetRowInputFormat { .build[FileStatus, Array[BlockLocation]]() } -private[parquet] object FileSystemHelper { +private[sql] object FileSystemHelper { def listFiles(pathStr: String, conf: Configuration): Seq[Path] = { val origPath = new Path(pathStr) val fs = origPath.getFileSystem(conf) @@ -512,19 +513,37 @@ private[parquet] object FileSystemHelper { fs.listStatus(path).map(_.getPath) } - /** - * Finds the maximum taskid in the output file names at the given path. - */ - def findMaxTaskId(pathStr: String, conf: Configuration): Int = { + /** + * List files with special extension + */ + def listFiles(origPath: Path, conf: Configuration, extension: String): Seq[Path] = { + val fs = origPath.getFileSystem(conf) + if (fs == null) { + throw new IllegalArgumentException( + s"Path $origPath is incorrectly formatted") + } + val path = origPath.makeQualified(fs) + if (fs.exists(path)) { + fs.listStatus(path).map(_.getPath).filter(p => p.getName.endsWith(extension)) + } else { + Seq.empty + } + } + + /** + * Finds the maximum taskid in the output file names at the given path. + */ + def findMaxTaskId(pathStr: String, conf: Configuration, extension: String): Int = { + // filename pattern is part-r-.$extension + require(Seq("orc", "parquet").contains(extension), s"Unsupported extension: $extension") + val nameP = new scala.util.matching.Regex(s"""part-r-(\\d{1,}).$extension""", "taskid") val files = FileSystemHelper.listFiles(pathStr, conf) - // filename pattern is part-r-.parquet - val nameP = new scala.util.matching.Regex("""part-r-(\d{1,}).parquet""", "taskid") val hiddenFileP = new scala.util.matching.Regex("_.*") files.map(_.getName).map { case nameP(taskid) => taskid.toInt case hiddenFileP() => 0 case other: String => { - sys.error("ERROR: attempting to append to set of Parquet files and found file" + + sys.error(s"ERROR: attempting to append to set of $extension files and found file" + s"that does not match name pattern: $other") 0 } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 8b5a90159e1bb..8d6530d308456 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -24,6 +24,7 @@ import java.util.{ArrayList => JArrayList} import scala.collection.JavaConversions._ import scala.language.implicitConversions import scala.reflect.runtime.universe.{TypeTag, typeTag} +import scala.Some import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path @@ -38,14 +39,16 @@ import org.apache.hadoop.hive.serde2.io.DateWritable import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import org.apache.spark.sql.hive.orc.OrcSchemaRDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators} import org.apache.spark.sql.catalyst.analysis.{OverrideCatalog, OverrideFunctionRegistry} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.execution.ExtractPythonUdfs -import org.apache.spark.sql.execution.QueryExecutionException +import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.{Command => PhysicalCommand} +import org.apache.spark.sql.catalyst.plans.logical.SetCommand +import org.apache.spark.sql.catalyst.plans.logical.NativeCommand import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand /** @@ -110,6 +113,17 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { s"The SQL dialect for parsing can be set using ${SQLConf.DIALECT}", "1.1") def hql(hqlQuery: String): SchemaRDD = hiveql(hqlQuery) + /** + * Creates a SchemaRDD from an RDD of case classes. + * + * @group userf + */ + implicit override def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) = { + SparkPlan.currentContext.set(self) + new OrcSchemaRDD(this, + LogicalRDD(ScalaReflection.attributesFor[A], RDDConversions.productToRowRdd(rdd))(self)) + } + /** * Creates a table using the schema of the given class. * @@ -121,6 +135,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting) } + /** + * Loads a ORC file, returning the result as a [[SchemaRDD]]. + * + * @group userf + */ + def orcFile(path: String): SchemaRDD = new SchemaRDD( + this, orc.OrcRelation(Seq.empty, path, Some(sparkContext.hadoopConfiguration), this)) + /** * Analyzes the given table in the current database to generate statistics, which will be * used in query optimizations. @@ -334,6 +356,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { HiveCommandStrategy(self), TakeOrdered, ParquetOperations, + OrcOperations, InMemoryScans, ParquetConversion, // Must be before HiveTableScans HiveTableScans, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 5c66322f1ed99..fdcd8aa58c922 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -22,12 +22,13 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate import org.apache.spark.sql.catalyst.planning._ -import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.types.StringType import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan} import org.apache.spark.sql.hive import org.apache.spark.sql.hive.execution._ +import org.apache.spark.sql.hive.orc.{WriteToOrcFile, InsertIntoOrcTable, OrcRelation, OrcTableScan} import org.apache.spark.sql.parquet.ParquetRelation import org.apache.spark.sql.{SQLContext, SchemaRDD} @@ -221,4 +222,24 @@ private[hive] trait HiveStrategies { case _ => Nil } } + + object OrcOperations extends Strategy { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case WriteToOrcFile(path, child) => + val relation = + OrcRelation.create(path, child, sparkContext.hadoopConfiguration, sqlContext) + InsertIntoOrcTable(relation, planLater(child), overwrite = true) :: Nil + case logical.InsertIntoTable(table: OrcRelation, partition, child, overwrite) => + InsertIntoOrcTable(table, planLater(child), overwrite) :: Nil + case PhysicalOperation(projectList, filters, relation: OrcRelation) => + // TODO: need to implement predict push down. + val prunePushedDownFilters = identity[Seq[Expression]] _ + pruneFilterProject( + projectList, + filters, + prunePushedDownFilters, + OrcTableScan(_, relation, None)) :: Nil + case _ => Nil + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala new file mode 100644 index 0000000000000..1c453f9ecd2bd --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import java.util.Properties +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.permission.FsAction +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector +import org.apache.hadoop.hive.ql.io.orc._ + +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode} +import org.apache.spark.sql.catalyst.analysis.{UnresolvedException, MultiInstanceRelation} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.hive.HiveMetastoreTypes +import org.apache.spark.sql.parquet.FileSystemHelper +import org.apache.spark.sql.SQLContext + +import scala.collection.JavaConversions._ + +private[sql] case class OrcRelation( + attributes: Seq[Attribute], + path: String, + @transient conf: Option[Configuration], + @transient sqlContext: SQLContext, + partitioningAttributes: Seq[Attribute] = Nil) + extends LeafNode with MultiInstanceRelation { + self: Product => + + val prop: Properties = new Properties + + override lazy val output = attributes ++ OrcFileOperator.orcSchema(path, conf, prop) + + // TODO: use statistics in ORC file + override lazy val statistics = Statistics(sizeInBytes = sqlContext.defaultSizeInBytes) + + override def newInstance() = + OrcRelation(attributes, path, conf, sqlContext).asInstanceOf[this.type] +} + +private[sql] object OrcRelation { + /** + * Creates a new OrcRelation and underlying Orcfile for the given LogicalPlan. Note that + * this is used inside [[org.apache.spark.sql.execution.SparkStrategies]] to + * create a resolved relation as a data sink for writing to a Orcfile. + * + * @param pathString The directory the ORCfile will be stored in. + * @param child The child node that will be used for extracting the schema. + * @param conf A configuration to be used. + * @return An empty OrcRelation with inferred metadata. + */ + def create( + pathString: String, + child: LogicalPlan, + conf: Configuration, + sqlContext: SQLContext): OrcRelation = { + if (!child.resolved) { + throw new UnresolvedException[LogicalPlan]( + child, + "Attempt to create Orc table from unresolved child") + } + val path = checkPath(pathString, false, conf) + new OrcRelation(child.output, path.toString, Some(conf), sqlContext) + } + + private def checkPath(pathStr: String, allowExisting: Boolean, conf: Configuration): Path = { + require(pathStr != null, "Unable to create OrcRelation: path is null") + val origPath = new Path(pathStr) + val fs = origPath.getFileSystem(conf) + require(fs != null, s"Unable to create OrcRelation: incorrectly formatted path $pathStr") + val path = origPath.makeQualified(fs) + if (!allowExisting) { + require(!fs.exists(path), s"File $pathStr already exists.") + } + if (fs.exists(path)) { + require(fs.getFileStatus(path).getPermission.getUserAction.implies(FsAction.READ_WRITE), + s"Unable to create OrcRelation: path $path not read-writable") + } + path + } +} + +private[sql] object OrcFileOperator{ + def getMetaDataReader(origPath: Path, configuration: Option[Configuration]): Reader = { + val conf = configuration.getOrElse(new Configuration()) + val fs: FileSystem = origPath.getFileSystem(conf) + val orcFiles = FileSystemHelper.listFiles(origPath, conf, ".orc") + if (orcFiles == Seq.empty) { + // should return null when write to orc file + return null + } + OrcFile.createReader(fs, orcFiles(0)) + } + + def orcSchema( + path: String, + conf: Option[Configuration], + prop: Properties): Seq[Attribute] = { + // get the schema info through ORC Reader + val origPath = new Path(path) + val reader = getMetaDataReader(origPath, conf) + if (reader == null) { + // return empty seq when saveAsOrcFile + return Seq.empty + } + val inspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector] + // data types that is inspected by this inspector + val schema = inspector.getTypeName + // set prop here, initial OrcSerde need it + val fields = inspector.getAllStructFieldRefs + val (columns, columnTypes) = fields.map { f => + f.getFieldName -> f.getFieldObjectInspector.getTypeName + }.unzip + prop.setProperty("columns", columns.mkString(",")) + prop.setProperty("columns.types", columnTypes.mkString(":")) + + HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType].toAttributes + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala new file mode 100644 index 0000000000000..82aeb12a55a8b --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.hive.orc + +import java.io.IOException +import java.text.SimpleDateFormat +import java.util.{Locale, Date} + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, FileOutputCommitter} +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import org.apache.hadoop.io.{Writable, NullWritable} +import org.apache.hadoop.mapreduce.{TaskID, TaskAttemptContext, Job} +import org.apache.hadoop.mapred.{SparkHadoopMapRedUtil, Reporter, JobConf} +import org.apache.hadoop.hive.ql.io.orc.{OrcSerde, OrcInputFormat, OrcOutputFormat} +import org.apache.hadoop.hive.serde2.objectinspector._ +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils +import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfoUtils, TypeInfo} + +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.parquet.FileSystemHelper +import org.apache.spark.{TaskContext, SerializableWritable} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode => LogicalUnaryNode} +import org.apache.spark.sql.catalyst.types.StructType +import org.apache.spark.sql.execution.UnaryNode +import org.apache.spark.sql.hive.{HiveMetastoreTypes, HadoopTableReader} + +import scala.collection.JavaConversions._ + +/** + * logical plan of writing to ORC file + */ +case class WriteToOrcFile( + path: String, + child: LogicalPlan) extends LogicalUnaryNode { + def output = child.output +} + +/** + * orc table scan operator. Imports the file that backs the given + * [[org.apache.spark.sql.hive.orc.OrcRelation]] as a ``RDD[Row]``. + */ +case class OrcTableScan( + output: Seq[Attribute], + relation: OrcRelation, + columnPruningPred: Option[Expression]) + extends LeafNode { + + @transient + lazy val serde: OrcSerde = initSerde + + private def initSerde(): OrcSerde = { + val serde = new OrcSerde + serde.initialize(null, relation.prop) + serde + } + + override def execute(): RDD[Row] = { + val sc = sqlContext.sparkContext + val job = new Job(sc.hadoopConfiguration) + + val conf: Configuration = job.getConfiguration + relation.path.split(",").foreach { curPath => + val qualifiedPath = { + val path = new Path(curPath) + path.getFileSystem(conf).makeQualified(path) + } + FileInputFormat.addInputPath(job, qualifiedPath) + } + + addColumnIds(output, relation, conf) + val inputClass = classOf[OrcInputFormat].asInstanceOf[ + Class[_ <: org.apache.hadoop.mapred.InputFormat[NullWritable, Writable]]] + + // use SpecificMutableRow to decrease GC garbage + val mutableRow = new SpecificMutableRow(output.map(_.dataType)) + val attrsWithIndex = output.zipWithIndex + val rowRdd = sc.hadoopRDD[NullWritable, Writable](conf.asInstanceOf[JobConf], inputClass, + classOf[NullWritable], classOf[Writable]).map(_._2).mapPartitions { iter => + val deserializer = serde + HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow) + } + rowRdd + } + + /** + * add column ids and names + * @param output + * @param relation + * @param conf + */ + def addColumnIds(output: Seq[Attribute], relation: OrcRelation, conf: Configuration) { + val fieldIdMap = relation.output.map(_.name).zipWithIndex.toMap + + val ids = output.map(att => { + val realName = att.name.toLowerCase(Locale.ENGLISH) + fieldIdMap.getOrElse(realName, -1) + }).filter(_ >= 0).map(_.asInstanceOf[Integer]) + if (ids != null && !ids.isEmpty) { + ColumnProjectionUtils.appendReadColumnIDs(conf, ids) + } + + val names = output.map(_.name) + if (names != null && !names.isEmpty) { + ColumnProjectionUtils.appendReadColumnNames(conf, names) + } + } + + /** + * Applies a (candidate) projection. + * + * @param prunedAttributes The list of attributes to be used in the projection. + * @return Pruned TableScan. + */ + def pruneColumns(prunedAttributes: Seq[Attribute]): OrcTableScan = { + // TODO: prune projection + OrcTableScan(prunedAttributes, relation, columnPruningPred) + } +} + +/** + * Operator that acts as a sink for queries on RDDs and can be used to + * store the output inside a directory of ORC files. This operator + * is similar to Hive's INSERT INTO TABLE operation in the sense that + * one can choose to either overwrite or append to a directory. Note + * that consecutive insertions to the same table must have compatible + * (source) schemas. + */ +private[sql] case class InsertIntoOrcTable( + relation: OrcRelation, + child: SparkPlan, + overwrite: Boolean = false) + extends UnaryNode with SparkHadoopMapRedUtil with org.apache.spark.Logging { + + override def output = child.output + + @transient val sc = sqlContext.sparkContext + + @transient lazy val orcSerde = initSerde + + private def initSerde(): OrcSerde = { + val serde: OrcSerde = new OrcSerde + serde.initialize(null, relation.prop) + serde + } + + /** + * Inserts all rows into the Orc file. + */ + override def execute() = { + val childRdd = child.execute() + assert(childRdd != null) + + val job = new Job(sqlContext.sparkContext.hadoopConfiguration) + val conf = job.getConfiguration + + val fspath = new Path(relation.path) + val fs = fspath.getFileSystem(conf) + + if (overwrite) { + try { + fs.delete(fspath, true) + } catch { + case e: IOException => + throw new IOException( + s"Unable to clear output directory ${fspath.toString} prior" + + s" to InsertIntoOrcTable:\n${e.toString}") + } + } + val structType = StructType.fromAttributes(relation.output) + // get Type String to build typeInfo + val orcSchema = HiveMetastoreTypes.toMetastoreType(structType) + + val writableRdd = childRdd.mapPartitions { iter => + val typeInfo: TypeInfo = + TypeInfoUtils.getTypeInfoFromTypeString(orcSchema) + val standardOI = TypeInfoUtils + .getStandardJavaObjectInspectorFromTypeInfo(typeInfo) + .asInstanceOf[StructObjectInspector] + val fieldOIs = standardOI + .getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray + val outputData = new Array[Any](fieldOIs.length) + iter.map { row => + var i = 0 + while (i < row.length) { + outputData(i) = HadoopTypeConverter.wrap((row(i), fieldOIs(i))) + i += 1 + } + orcSerde.serialize(outputData, standardOI) + } + } + + saveAsHadoopFile(writableRdd, relation.path, conf) + + // We return the child RDD to allow chaining (alternatively, one could return nothing). + childRdd + } + + private def saveAsHadoopFile( + rdd: RDD[Writable], + path: String, + @transient conf: Configuration) { + val job = new Job(conf) + val keyType = classOf[Void] + job.setOutputKeyClass(keyType) + job.setOutputValueClass(classOf[Writable]) + FileOutputFormat.setOutputPath(job, new Path(path)) + + val wrappedConf = new SerializableWritable(job.getConfiguration) + + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + val jobtrackerID = formatter.format(new Date()) + val stageId = sqlContext.sparkContext.newRddId() + + val taskIdOffset = + if (overwrite) { + 1 + } else { + FileSystemHelper + .findMaxTaskId( + FileOutputFormat.getOutputPath(job).toString, job.getConfiguration, "orc") + 1 + } + + def getWriter( + outFormat: OrcOutputFormat, + conf: Configuration, + path: Path, + reporter: Reporter) = { + val fs = path.getFileSystem(conf) + + outFormat.getRecordWriter(fs, conf.asInstanceOf[JobConf], path.toUri.getPath, reporter). + asInstanceOf[org.apache.hadoop.mapred.RecordWriter[NullWritable, Writable]] + } + + def getCommitterAndWriter(offset: Int, context: TaskAttemptContext) = { + val outFormat = new OrcOutputFormat + + val taskId: TaskID = context.getTaskAttemptID.getTaskID + val partition: Int = taskId.getId + val filename = s"part-r-${partition + offset}.orc" + val output: Path = FileOutputFormat.getOutputPath(context) + val committer = new FileOutputCommitter(output, context) + val path = new Path(committer.getWorkPath, filename) + val writer = getWriter(outFormat, wrappedConf.value, path, Reporter.NULL) + (committer, writer) + } + + def writeShard(context: TaskContext, iter: Iterator[Writable]): Int = { + // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it + // around by taking a mod. We expect that no task will be attempted 2 billion times. + val attemptNumber = (context.attemptId % Int.MaxValue).toInt + /* "reduce task" */ + val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId, + attemptNumber) + val hadoopContext = newTaskAttemptContext(wrappedConf.value.asInstanceOf[JobConf], attemptId) + val workerAndComitter = getCommitterAndWriter(taskIdOffset, hadoopContext) + val writer = workerAndComitter._2 + + while (iter.hasNext) { + val row = iter.next() + writer.write(NullWritable.get(), row) + } + + writer.close(Reporter.NULL) + workerAndComitter._1.commitTask(hadoopContext) + return 1 + } + + /* apparently we need a TaskAttemptID to construct an OutputCommitter; + * however we're only going to use this local OutputCommitter for + * setupJob/commitJob, so we just use a dummy "map" task. + */ + val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0) + val jobTaskContext = newTaskAttemptContext( + wrappedConf.value.asInstanceOf[JobConf], jobAttemptId) + val workerAndComitter = getCommitterAndWriter(taskIdOffset, jobTaskContext) + workerAndComitter._1.setupJob(jobTaskContext) + sc.runJob(rdd, writeShard _) + workerAndComitter._1.commitJob(jobTaskContext) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala new file mode 100644 index 0000000000000..14fe51b92d4b1 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.{SQLContext, SchemaRDD} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.hadoop.hive.serde2.objectinspector._ +import org.apache.hadoop.hive.serde2.objectinspector.primitive.{JavaHiveDecimalObjectInspector, JavaHiveVarcharObjectInspector} +import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} +import org.apache.spark.sql.catalyst.expressions.Row +/* Implicit conversions */ +import scala.collection.JavaConversions._ +package object orc { + class OrcSchemaRDD( + @transient val sqlContext1: SQLContext, + @transient val baseLogicalPlan1: LogicalPlan) + extends SchemaRDD(sqlContext1, baseLogicalPlan1) { + /** + * Saves the contents of this `SchemaRDD` as a ORC file, preserving the schema. Files that + * are written out using this method can be read back in as a SchemaRDD using the `orcFile` + * function. + * Note: you can only use it in HiveContext + * + * @group schema + */ + def saveAsOrcFile(path: String): Unit = { + sqlContext.executePlan(WriteToOrcFile(path, logicalPlan)).toRdd + } + } + + // TypeConverter for InsertIntoOrcTable + object HadoopTypeConverter extends HiveInspectors { + def wrap(a: (Any, ObjectInspector)): Any = a match { + case (s: String, oi: JavaHiveVarcharObjectInspector) => + new HiveVarchar(s, s.size) + + case (bd: BigDecimal, oi: JavaHiveDecimalObjectInspector) => + new HiveDecimal(bd.underlying()) + + case (row: Row, oi: StandardStructObjectInspector) => + val struct = oi.create() + row.zip(oi.getAllStructFieldRefs: Seq[StructField]).foreach { + case (data, field) => + oi.setStructFieldData(struct, field, wrap(data, field.getFieldObjectInspector)) + } + struct + case (s: Seq[_], oi: ListObjectInspector) => + val wrappedSeq = s.map(wrap(_, oi.getListElementObjectInspector)) + seqAsJavaList(wrappedSeq) + + case (m: Map[_, _], oi: MapObjectInspector) => + val keyOi = oi.getMapKeyObjectInspector + val valueOi = oi.getMapValueObjectInspector + val wrappedMap = m.map { case (key, value) => wrap(key, keyOi) -> wrap(value, valueOi) } + mapAsJavaMap(wrappedMap) + + case (obj, _) => + obj + } + } + + // for orc compression type, only take effect in hive 0.13.1 + val orcDefaultCompressVar = "hive.exec.orc.default.compress" + // for prediction push down in hive-0.13.1, don't enable it + val ORC_FILTER_PUSHDOWN_ENABLED = false + val SARG_PUSHDOWN = "sarg.pushdown" +} diff --git a/sql/hive/src/test/resources/data/files/orcfiles/.part-r-1.orc.crc b/sql/hive/src/test/resources/data/files/orcfiles/.part-r-1.orc.crc new file mode 100644 index 0000000000000..048f7346c2c05 Binary files /dev/null and b/sql/hive/src/test/resources/data/files/orcfiles/.part-r-1.orc.crc differ diff --git a/sql/hive/src/test/resources/data/files/orcfiles/.part-r-2.orc.crc b/sql/hive/src/test/resources/data/files/orcfiles/.part-r-2.orc.crc new file mode 100644 index 0000000000000..52810863f2ae9 Binary files /dev/null and b/sql/hive/src/test/resources/data/files/orcfiles/.part-r-2.orc.crc differ diff --git a/sql/hive/src/test/resources/data/files/orcfiles/_SUCCESS b/sql/hive/src/test/resources/data/files/orcfiles/_SUCCESS new file mode 100755 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/data/files/orcfiles/part-r-1.orc b/sql/hive/src/test/resources/data/files/orcfiles/part-r-1.orc new file mode 100755 index 0000000000000..3188e14c1fa48 Binary files /dev/null and b/sql/hive/src/test/resources/data/files/orcfiles/part-r-1.orc differ diff --git a/sql/hive/src/test/resources/data/files/orcfiles/part-r-2.orc b/sql/hive/src/test/resources/data/files/orcfiles/part-r-2.orc new file mode 100755 index 0000000000000..525488226e2a9 Binary files /dev/null and b/sql/hive/src/test/resources/data/files/orcfiles/part-r-2.orc differ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/ORCQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/ORCQuerySuite.scala new file mode 100644 index 0000000000000..791d7c01066de --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/ORCQuerySuite.scala @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import java.util.Properties +import java.io.File +import org.scalatest.BeforeAndAfterAll +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.catalyst.util.getTempFilePath +import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.util.Utils +import org.apache.hadoop.hive.ql.io.orc.CompressionKind + +case class TestRDDEntry(key: Int, value: String) + +case class NullReflectData( + intField: java.lang.Integer, + longField: java.lang.Long, + floatField: java.lang.Float, + doubleField: java.lang.Double, + booleanField: java.lang.Boolean) + +case class OptionalReflectData( + intField: Option[Int], + longField: Option[Long], + floatField: Option[Float], + doubleField: Option[Double], + booleanField: Option[Boolean]) + +case class Nested(i: Int, s: String) + +case class Data(array: Seq[Int], nested: Nested) + +case class Contact(name: String, phone: String) + +case class Person(name: String, age: Int, contacts: Seq[Contact]) + +case class AllDataTypes( + stringField: String, + intField: Int, + longField: Long, + floatField: Float, + doubleField: Double, + shortField: Short, + byteField: Byte, + booleanField: Boolean) + +case class AllDataTypesWithNonPrimitiveType( + stringField: String, + intField: Int, + longField: Long, + floatField: Float, + doubleField: Double, + shortField: Short, + byteField: Byte, + booleanField: Boolean, + array: Seq[Int], + arrayContainsNull: Seq[Option[Int]], + map: Map[Int, Long], + mapValueContainsNull: Map[Int, Option[Long]], + data: Data) + +case class BinaryData(binaryData: Array[Byte]) + +class OrcQuerySuite extends QueryTest with BeforeAndAfterAll { + + test("Read/Write All Types") { + val tempDir = getTempFilePath("orcTest").getCanonicalPath + val range = (0 to 255) + val data = sparkContext + .parallelize(range) + .map(x => AllDataTypes( + s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0)) + + data.saveAsOrcFile(tempDir) + checkAnswer( + TestHive.orcFile(tempDir), + data.toSchemaRDD.collect().toSeq) + + Utils.deleteRecursively(new File(tempDir)) + } + + test("read/write binary data") { + val tempDir = getTempFilePath("orcTest").getCanonicalPath + val range = (0 to 3) + sparkContext.parallelize(range) + .map(x => BinaryData(s"test$x".getBytes("utf8"))).saveAsOrcFile(tempDir) + + TestHive.orcFile(tempDir) + .map(r => new String(r(0).asInstanceOf[Array[Byte]], "utf8")) + .collect().toSet == Set("test0", "test1", "test2", "test3") + Utils.deleteRecursively(new File(tempDir)) + } + + test("Read/Write All Types with non-primitive type") { + val tempDir = getTempFilePath("orcTest").getCanonicalPath + val range = (0 to 255) + val data = sparkContext.parallelize(range) + .map(x => AllDataTypesWithNonPrimitiveType( + s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0, + (0 until x), + (0 until x).map(Option(_).filter(_ % 3 == 0)), + (0 until x).map(i => i -> i.toLong).toMap, + (0 until x).map(i => i -> Option(i.toLong)).toMap + (x -> None), + Data((0 until x), Nested(x, s"$x")))) + data.saveAsOrcFile(tempDir) + + checkAnswer( + TestHive.orcFile(tempDir), + data.toSchemaRDD.collect().toSeq) + Utils.deleteRecursively(new File(tempDir)) + } + + test("Creating case class RDD table") { + sparkContext.parallelize((1 to 100)) + .map(i => TestRDDEntry(i, s"val_$i")) + .registerTempTable("tmp") + val rdd = sql("SELECT * FROM tmp").collect().sortBy(_.getInt(0)) + var counter = 1 + rdd.foreach { + // '===' does not like string comparison? + row => { + assert(row.getString(1).equals(s"val_$counter"), s"row $counter value ${row.getString(1)} does not match val_$counter") + counter = counter + 1 + } + } + } + + test("Simple selection form orc table") { + val tempDir = getTempFilePath("orcTest").getCanonicalPath + val data = sparkContext.parallelize((1 to 10)) + .map(i => Person(s"name_$i", i, (0 until 2).map{ m=> + Contact(s"contact_$m", s"phone_$m") })) + data.saveAsOrcFile(tempDir) + val f = TestHive.orcFile(tempDir) + f.registerTempTable("tmp") + var rdd = sql("SELECT name FROM tmp where age <= 5") + assert(rdd.count() == 5) + + rdd = sql("SELECT name, contacts FROM tmp where age > 5") + assert(rdd.count() == 5) + + val contacts = rdd.flatMap(t=>t(1).asInstanceOf[Seq[_]]) + assert(contacts.count() == 10) + Utils.deleteRecursively(new File(tempDir)) + } + + test("save and load case class RDD with Nones as orc") { + val data = OptionalReflectData(None, None, None, None, None) + val rdd = sparkContext.parallelize(data :: data :: data :: Nil) + val tempDir = getTempFilePath("orcTest").getCanonicalPath + rdd.saveAsOrcFile(tempDir) + val readFile = TestHive.orcFile(tempDir) + val rdd_saved = readFile.collect() + assert(rdd_saved(0) === Seq.fill(5)(null)) + Utils.deleteRecursively(new File(tempDir)) + } + + test("Compression options for writing to a Orcfile") { + val tempDir = getTempFilePath("orcTest").getCanonicalPath + val rdd = TestHive.sparkContext.parallelize((1 to 100)) + .map(i => TestRDDEntry(i, s"val_$i")) + + // test default compression codec, now only support zlib + rdd.saveAsOrcFile(tempDir) + val actualCodec = OrcFileOperator.getMetaDataReader(new Path(tempDir), Some(new Configuration())).getCompression.name + assert(actualCodec == "ZLIB") + + Utils.deleteRecursively(new File(tempDir)) + } + + test("Get ORC Schema with ORC Reader") { + val path = "src/test/resources/data/files/orcfiles" + val attributes = OrcFileOperator.orcSchema(path, Some(TestHive.sparkContext.hadoopConfiguration), new Properties()) + assert(attributes(0).dataType == StringType) + assert(attributes(1).dataType == IntegerType) + assert(attributes(2).dataType == LongType) + assert(attributes(3).dataType == FloatType) + assert(attributes(4).dataType == DoubleType) + assert(attributes(5).dataType == ShortType) + assert(attributes(6).dataType == ByteType) + assert(attributes(7).dataType == BooleanType) + } + + ignore("Other Compression options for writing to an Orcfile only supported in hive 0.13.1 and above") { + TestHive.sparkContext.hadoopConfiguration.set(orcDefaultCompressVar, "SNAPPY") + var tempDir = getTempFilePath("orcTest").getCanonicalPath + val rdd = sparkContext.parallelize((1 to 100)) + .map(i => TestRDDEntry(i, s"val_$i")) + rdd.saveAsOrcFile(tempDir) + var actualCodec = OrcFileOperator.getMetaDataReader(new Path(tempDir), Some(new Configuration())).getCompression + assert(actualCodec == CompressionKind.SNAPPY) + Utils.deleteRecursively(new File(tempDir)) + + TestHive.sparkContext.hadoopConfiguration.set(orcDefaultCompressVar, "NONE") + tempDir = getTempFilePath("orcTest").getCanonicalPath + rdd.saveAsOrcFile(tempDir) + actualCodec = OrcFileOperator.getMetaDataReader(new Path(tempDir), Some(new Configuration())).getCompression + assert(actualCodec == CompressionKind.NONE) + Utils.deleteRecursively(new File(tempDir)) + + TestHive.sparkContext.hadoopConfiguration.set(orcDefaultCompressVar, "LZO") + tempDir = getTempFilePath("orcTest").getCanonicalPath + rdd.saveAsOrcFile(tempDir) + actualCodec = OrcFileOperator.getMetaDataReader(new Path(tempDir), Some(new Configuration())).getCompression + assert(actualCodec == CompressionKind.LZO) + Utils.deleteRecursively(new File(tempDir)) + } +}