Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,96 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or
</tr>
</table>

## ORC Files

[ORC](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC) is a columnar format that is supported in Hive, it provides a highly efficient way to store data on HDFS to speed up query performance.
Spark SQL provides support for both reading and writing ORC files that automatically preserves the schema
of the original data.

### Loading Data Programmatically

Using the data from the above example:

<div class="codetabs">

<div data-lang="scala" markdown="1">

{% highlight scala %}
// Use HiveContext to read or write ORC File.
val sqlContext = new HiveContext(sc)
import sqlContext._
val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using ORC.
rdd.registerTempTable("people")
rdd.saveAsOrcFile("people.orc")

// Read in the ORC file created above. ORC files are self-describing so the schema is preserved.
// The result of loading a ORC file is also a SchemaRDD.
val orcFile = hiveContext.orcFile("pair.orc")

//ORC files can also be registered as tables and then used in SQL statements.
orcFile.registerTempTable("orcFile")
val teenagers = sqlContext.sql("SELECT name FROM orcFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
{% endhighlight %}

</div>

<div data-lang="java" markdown="1">

{% highlight java %}
// Use JavaHiveContext to read or write ORC File.
JavaHiveContext sqlContext = new org.apache.spark.sql.hive.api.java.HiveContext(sc);
JavaSchemaRDD schemaPeople = ... // The JavaSchemaRDD from the previous example.

// JavaSchemaRDDs can be saved as ORC files, maintaining the schema information.
schemaPeople.saveAsOrcFile("people.orc");

// Read in the ORC file created above. ORC files are self-describing so the schema is preserved.
// The result of loading a ORC file is also a JavaSchemaRDD.
JavaSchemaRDD orcFile = sqlContext.orcFile("people.orc");

// ORC files can also be registered as tables and then used in SQL statements.
orcFile.registerTempTable("orcFile");
JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM orcFile WHERE age >= 13 AND age <= 19");
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();
{% endhighlight %}

</div>

<div data-lang="python" markdown="1">

{% highlight python %}
# Use HiveContext to read or write ORC File.
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

schemaPeople # The SchemaRDD from the previous example.

# SchemaRDDs can be saved as ORC files, maintaining the schema information.
schemaPeople.saveAsOrcFile("people.orc")

# Read in the ORC file created above. ORC files are self-describing so the schema is preserved.
# The result of loading a ORC file is also a SchemaRDD.
orcFile = sqlContext.orcFile("people.orc")

# ORC files can also be registered as tables and then used in SQL statements.
orcFile.registerTempTable("orcFile");
teenagers = sqlContext.sql("SELECT name FROM orcFile WHERE age >= 13 AND age <= 19")
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
print teenName
{% endhighlight %}

</div>

</div>

## JSON Datasets
<div class="codetabs">

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ package object dsl {

object plans { // scalastyle:ignore
implicit class DslLogicalPlan(val logicalPlan: LogicalPlan) extends LogicalPlanFunctions {
def writeToFile(path: String) = WriteToFile(path, logicalPlan)
def writeToFile(path: String) = WriteToPaquetFile(path, logicalPlan)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ case class CreateTableAsSelect(
override lazy val resolved = (databaseName != None && childrenResolved)
}

case class WriteToFile(
case class WriteToPaquetFile(
path: String,
child: LogicalPlan) extends UnaryNode {
override def output = child.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[sql] trait SchemaRDDLike {
@transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match {
// For various commands (like DDL) and queries with side effects, we force query optimization to
// happen right away to let these side effects take place eagerly.
case _: Command | _: InsertIntoTable | _: CreateTableAsSelect |_: WriteToFile =>
case _: Command | _: InsertIntoTable | _: CreateTableAsSelect |_: WriteToPaquetFile =>
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
case _ =>
baseLogicalPlan
Expand All @@ -73,7 +73,7 @@ private[sql] trait SchemaRDDLike {
* @group schema
*/
def saveAsParquetFile(path: String): Unit = {
sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
sqlContext.executePlan(WriteToPaquetFile(path, logicalPlan)).toRdd
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

object ParquetOperations extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// TODO: need to support writing to other types of files. Unify the below code paths.
case logical.WriteToFile(path, child) =>
case logical.WriteToPaquetFile(path, child) =>
val relation =
ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, sqlContext)
// Note: overwrite=false because otherwise the metadata we just created will be deleted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ case class InsertIntoParquetTable(
1
} else {
FileSystemHelper
.findMaxTaskId(NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1
.findMaxTaskId(
NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration, "parquet") + 1
}

def writeShard(context: TaskContext, iter: Iterator[Row]): Int = {
Expand Down Expand Up @@ -496,7 +497,7 @@ private[parquet] object FilteringParquetRowInputFormat {
.build[FileStatus, Array[BlockLocation]]()
}

private[parquet] object FileSystemHelper {
private[sql] object FileSystemHelper {
def listFiles(pathStr: String, conf: Configuration): Seq[Path] = {
val origPath = new Path(pathStr)
val fs = origPath.getFileSystem(conf)
Expand All @@ -512,19 +513,37 @@ private[parquet] object FileSystemHelper {
fs.listStatus(path).map(_.getPath)
}

/**
* Finds the maximum taskid in the output file names at the given path.
*/
def findMaxTaskId(pathStr: String, conf: Configuration): Int = {
/**
* List files with special extension
*/
def listFiles(origPath: Path, conf: Configuration, extension: String): Seq[Path] = {
val fs = origPath.getFileSystem(conf)
if (fs == null) {
throw new IllegalArgumentException(
s"Path $origPath is incorrectly formatted")
}
val path = origPath.makeQualified(fs)
if (fs.exists(path)) {
fs.listStatus(path).map(_.getPath).filter(p => p.getName.endsWith(extension))
} else {
Seq.empty
}
}

/**
* Finds the maximum taskid in the output file names at the given path.
*/
def findMaxTaskId(pathStr: String, conf: Configuration, extension: String): Int = {
// filename pattern is part-r-<int>.$extension
require(Seq("orc", "parquet").contains(extension), s"Unsupported extension: $extension")
val nameP = new scala.util.matching.Regex(s"""part-r-(\\d{1,}).$extension""", "taskid")
val files = FileSystemHelper.listFiles(pathStr, conf)
// filename pattern is part-r-<int>.parquet
val nameP = new scala.util.matching.Regex("""part-r-(\d{1,}).parquet""", "taskid")
val hiddenFileP = new scala.util.matching.Regex("_.*")
files.map(_.getName).map {
case nameP(taskid) => taskid.toInt
case hiddenFileP() => 0
case other: String => {
sys.error("ERROR: attempting to append to set of Parquet files and found file" +
sys.error(s"ERROR: attempting to append to set of $extension files and found file" +
s"that does not match name pattern: $other")
0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.{ArrayList => JArrayList}
import scala.collection.JavaConversions._
import scala.language.implicitConversions
import scala.reflect.runtime.universe.{TypeTag, typeTag}
import scala.Some

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
Expand All @@ -38,14 +39,16 @@ import org.apache.hadoop.hive.serde2.io.DateWritable

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.hive.orc.OrcSchemaRDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators}
import org.apache.spark.sql.catalyst.analysis.{OverrideCatalog, OverrideFunctionRegistry}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.ExtractPythonUdfs
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.{Command => PhysicalCommand}
import org.apache.spark.sql.catalyst.plans.logical.SetCommand
import org.apache.spark.sql.catalyst.plans.logical.NativeCommand
import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand

/**
Expand Down Expand Up @@ -110,6 +113,17 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
s"The SQL dialect for parsing can be set using ${SQLConf.DIALECT}", "1.1")
def hql(hqlQuery: String): SchemaRDD = hiveql(hqlQuery)

/**
* Creates a SchemaRDD from an RDD of case classes.
*
* @group userf
*/
implicit override def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) = {
SparkPlan.currentContext.set(self)
new OrcSchemaRDD(this,
LogicalRDD(ScalaReflection.attributesFor[A], RDDConversions.productToRowRdd(rdd))(self))
}

/**
* Creates a table using the schema of the given class.
*
Expand All @@ -121,6 +135,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting)
}

/**
* Loads a ORC file, returning the result as a [[SchemaRDD]].
*
* @group userf
*/
def orcFile(path: String): SchemaRDD = new SchemaRDD(
this, orc.OrcRelation(Seq.empty, path, Some(sparkContext.hadoopConfiguration), this))

/**
* Analyzes the given table in the current database to generate statistics, which will be
* used in query optimizations.
Expand Down Expand Up @@ -334,6 +356,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
HiveCommandStrategy(self),
TakeOrdered,
ParquetOperations,
OrcOperations,
InMemoryScans,
ParquetConversion, // Must be before HiveTableScans
HiveTableScans,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.types.StringType
import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan}
import org.apache.spark.sql.hive
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.orc.{WriteToOrcFile, InsertIntoOrcTable, OrcRelation, OrcTableScan}
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.{SQLContext, SchemaRDD}

Expand Down Expand Up @@ -221,4 +222,24 @@ private[hive] trait HiveStrategies {
case _ => Nil
}
}

object OrcOperations extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case WriteToOrcFile(path, child) =>
val relation =
OrcRelation.create(path, child, sparkContext.hadoopConfiguration, sqlContext)
InsertIntoOrcTable(relation, planLater(child), overwrite = true) :: Nil
case logical.InsertIntoTable(table: OrcRelation, partition, child, overwrite) =>
InsertIntoOrcTable(table, planLater(child), overwrite) :: Nil
case PhysicalOperation(projectList, filters, relation: OrcRelation) =>
// TODO: need to implement predict push down.
val prunePushedDownFilters = identity[Seq[Expression]] _
pruneFilterProject(
projectList,
filters,
prunePushedDownFilters,
OrcTableScan(_, relation, None)) :: Nil
case _ => Nil
}
}
}
Loading