-
Notifications
You must be signed in to change notification settings - Fork 0
Just for review #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
fb14a06
ec3cdaf
7126290
655b23f
1505af4
1db30b1
9529d68
25416c3
89421ef
40f9d68
b2b5455
349bb92
65483e8
629f95e
d1bba23
b46ecb1
f68bea0
b06e335
7bdc503
4d0950b
37a2192
4e1c839
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -597,6 +597,129 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or | |
| </tr> | ||
| </table> | ||
|
|
||
| ## ORC Files | ||
|
|
||
| [ORC](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC) is a columnar format that is supported in Hive, it provides a highly efficient way to store data on HDFS to speed up query performance. | ||
| Spark SQL provides support for both reading and writing ORC files that automatically preserves the schema | ||
| of the original data. | ||
|
|
||
| ### Loading Data Programmatically | ||
|
|
||
| Using the data from the above example: | ||
|
|
||
| <div class="codetabs"> | ||
|
|
||
| <div data-lang="scala" markdown="1"> | ||
|
|
||
| {% highlight scala %} | ||
| // Use HiveContext to read or write ORC File. | ||
| val sqlContext = new HiveContext(sc) | ||
| import sqlContext._ | ||
| val people: RDD[Person] = ... // An RDD of case class objects, from the previous example. | ||
|
|
||
| // The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using ORC. | ||
| rdd.registerTempTable("people") | ||
| rdd.saveAsOrcFile("people.orc") | ||
|
|
||
| // Read in the ORC file created above. ORC files are self-describing so the schema is preserved. | ||
| // The result of loading a ORC file is also a SchemaRDD. | ||
| val orcFile = hiveContext.orcFile("pair.orc") | ||
|
|
||
| //ORC files can also be registered as tables and then used in SQL statements. | ||
| orcFile.registerTempTable("orcFile") | ||
| val teenagers = sqlContext.sql("SELECT name FROM orcFile WHERE age >= 13 AND age <= 19") | ||
| teenagers.map(t => "Name: " + t(0)).collect().foreach(println) | ||
| {% endhighlight %} | ||
|
|
||
| </div> | ||
|
|
||
| <div data-lang="java" markdown="1"> | ||
|
|
||
| {% highlight java %} | ||
| // Use JavaHiveContext to read or write ORC File. | ||
| JavaHiveContext sqlContext = new org.apache.spark.sql.hive.api.java.HiveContext(sc); | ||
| JavaSchemaRDD schemaPeople = ... // The JavaSchemaRDD from the previous example. | ||
|
|
||
| // JavaSchemaRDDs can be saved as ORC files, maintaining the schema information. | ||
| schemaPeople.saveAsOrcFile("people.orc"); | ||
|
|
||
| // Read in the ORC file created above. ORC files are self-describing so the schema is preserved. | ||
| // The result of loading a ORC file is also a JavaSchemaRDD. | ||
| JavaSchemaRDD orcFile = sqlContext.orcFile("people.orc"); | ||
|
|
||
| // ORC files can also be registered as tables and then used in SQL statements. | ||
| orcFile.registerTempTable("orcFile"); | ||
| JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM orcFile WHERE age >= 13 AND age <= 19"); | ||
| List<String> teenagerNames = teenagers.map(new Function<Row, String>() { | ||
| public String call(Row row) { | ||
| return "Name: " + row.getString(0); | ||
| } | ||
| }).collect(); | ||
| {% endhighlight %} | ||
|
|
||
| </div> | ||
|
|
||
| <div data-lang="python" markdown="1"> | ||
|
|
||
| {% highlight python %} | ||
| # Use HiveContext to read or write ORC File. | ||
| from pyspark.sql import HiveContext | ||
| sqlContext = HiveContext(sc) | ||
|
|
||
| schemaPeople # The SchemaRDD from the previous example. | ||
|
|
||
| # SchemaRDDs can be saved as ORC files, maintaining the schema information. | ||
| schemaPeople.saveAsOrcFile("people.orc") | ||
|
|
||
| # Read in the ORC file created above. ORC files are self-describing so the schema is preserved. | ||
| # The result of loading a ORC file is also a SchemaRDD. | ||
| orcFile = sqlContext.orcFile("people.orc") | ||
|
|
||
| # ORC files can also be registered as tables and then used in SQL statements. | ||
| orcFile.registerTempTable("orcFile"); | ||
| teenagers = sqlContext.sql("SELECT name FROM orcFile WHERE age >= 13 AND age <= 19") | ||
| teenNames = teenagers.map(lambda p: "Name: " + p.name) | ||
| for teenName in teenNames.collect(): | ||
| print teenName | ||
| {% endhighlight %} | ||
|
|
||
| </div> | ||
|
|
||
| </div> | ||
|
|
||
| ### Configuration | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. delete this |
||
|
|
||
| Configuration of ORC can be done using the `setConf` method on HiveContext or by running | ||
| `SET key=value` commands using SQL. | ||
|
|
||
| <table class="table"> | ||
| <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> | ||
| <tr> | ||
| <td><code>spark.sql.parquet.binaryAsString</code></td> | ||
| <td>false</td> | ||
| <td> | ||
| Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do | ||
| not differentiate between binary data and strings when writing out the Parquet schema. This | ||
| flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems. | ||
| </td> | ||
| </tr> | ||
| <tr> | ||
| <td><code>spark.sql.parquet.cacheMetadata</code></td> | ||
| <td>false</td> | ||
| <td> | ||
| Turns on caching of Parquet schema metadata. Can speed up querying of static data. | ||
| </td> | ||
| </tr> | ||
| <tr> | ||
| <td><code>spark.sql.parquet.compression.codec</code></td> | ||
| <td>snappy</td> | ||
| <td> | ||
| Sets the compression codec use when writing Parquet files. Acceptable values include: | ||
| uncompressed, snappy, gzip, lzo. | ||
| </td> | ||
| </tr> | ||
| </table> | ||
|
|
||
| ## JSON Datasets | ||
| <div class="codetabs"> | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -149,8 +149,10 @@ class SQLContext(@transient val sparkContext: SparkContext) | |
| * | ||
| * @group userf | ||
| */ | ||
| def parquetFile(path: String): SchemaRDD = | ||
| new SchemaRDD(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration), this)) | ||
| def parquetFile(path: String): SchemaRDD = { | ||
|
||
| val a = new SchemaRDD(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration), this)) | ||
| a | ||
| } | ||
|
|
||
| /** | ||
| * Loads a JSON file (one object per line), returning the result as a [[SchemaRDD]]. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ import scala.collection.JavaConversions._ | |
| import scala.language.implicitConversions | ||
| import scala.reflect.runtime.universe.{TypeTag, typeTag} | ||
|
|
||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.fs.FileSystem | ||
| import org.apache.hadoop.fs.Path | ||
| import org.apache.hadoop.hive.conf.HiveConf | ||
|
|
@@ -38,15 +39,21 @@ import org.apache.hadoop.hive.serde2.io.DateWritable | |
|
|
||
| import org.apache.spark.SparkContext | ||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.annotation.Experimental | ||
| import org.apache.spark.sql.hive.orc.{OrcSchemaRDD, OrcRelation} | ||
| import org.apache.spark.sql._ | ||
| import org.apache.spark.sql.catalyst.ScalaReflection | ||
| import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators} | ||
| import org.apache.spark.sql.catalyst.analysis.{OverrideCatalog, OverrideFunctionRegistry} | ||
| import org.apache.spark.sql.catalyst.plans.logical._ | ||
| import org.apache.spark.sql.execution.ExtractPythonUdfs | ||
| import org.apache.spark.sql.execution.QueryExecutionException | ||
| import org.apache.spark.sql.execution.{Command => PhysicalCommand} | ||
| import org.apache.spark.sql.execution.{Command => PhysicalCommand, _} | ||
| import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand | ||
| import org.apache.spark.sql.catalyst.plans.logical.SetCommand | ||
| import scala.Some | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. imports |
||
| import org.apache.spark.sql.catalyst.plans.logical.NativeCommand | ||
| import org.apache.spark.sql.hive.MetastoreRelation | ||
| import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand | ||
| import org.apache.spark.sql.execution.LogicalRDD | ||
|
|
||
| /** | ||
| * DEPRECATED: Use HiveContext instead. | ||
|
|
@@ -109,6 +116,25 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { | |
| @deprecated("hql() is deprecated as the sql function now parses using HiveQL by default. " + | ||
| s"The SQL dialect for parsing can be set using ${SQLConf.DIALECT}", "1.1") | ||
| def hql(hqlQuery: String): SchemaRDD = hiveql(hqlQuery) | ||
| /** | ||
| * Creates a SchemaRDD from an RDD of case classes. | ||
| * | ||
| * @group userf | ||
| */ | ||
| // implicit def createOrcSchemaRDD(rdd: SchemaRDD) = { | ||
| // new OrcSchemaRDD(rdd) | ||
| // } | ||
|
|
||
| /** | ||
| * Creates a SchemaRDD from an RDD of case classes. | ||
| * | ||
| * @group userf | ||
| */ | ||
| implicit override def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) = { | ||
| SparkPlan.currentContext.set(self) | ||
| new OrcSchemaRDD(this, | ||
| LogicalRDD(ScalaReflection.attributesFor[A], RDDConversions.productToRowRdd(rdd))(self)) | ||
| } | ||
|
|
||
| /** | ||
| * Creates a table using the schema of the given class. | ||
|
|
@@ -121,6 +147,48 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { | |
| catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting) | ||
| } | ||
|
|
||
| /** | ||
| * Loads a ORC file, returning the result as a [[SchemaRDD]]. | ||
| * | ||
| * @group userf | ||
| */ | ||
| def orcFile(path: String): SchemaRDD = | ||
| new SchemaRDD(this, orc.OrcRelation(Seq.empty, path, Some(sparkContext.hadoopConfiguration), this)) | ||
|
|
||
| // /** | ||
| // * :: Experimental :: | ||
| // * Creates an empty orc file with the schema of class `A`, which can be registered as a table. | ||
| // * This registered table can be used as the target of future `insertInto` operations. | ||
| // * | ||
| // * {{{ | ||
| // * val sqlContext = new HiveContext(...) | ||
| // * import sqlContext._ | ||
| // * | ||
| // * case class Person(name: String, age: Int) | ||
| // * createOrcFile[Person]("path/to/file.orc").registerTempTable("people") | ||
| // * sql("INSERT INTO people SELECT 'michael', 29") | ||
| // * }}} | ||
| // * | ||
| // * @tparam A A case class type that describes the desired schema of the orc file to be | ||
| // * created. | ||
| // * @param path The path where the directory containing parquet metadata should be created. | ||
| // * Data inserted into this table will also be stored at this location. | ||
| // * @param allowExisting When false, an exception will be thrown if this directory already exists. | ||
| // * @param conf A Hadoop configuration object that can be used to specify options to the parquet | ||
| // * output format. | ||
| // * | ||
| // * @group userf | ||
| // */ | ||
| // @Experimental | ||
| // def createOrcFile[A <: Product : TypeTag]( | ||
| // path: String, | ||
| // allowExisting: Boolean = true, | ||
| // conf: Configuration = new Configuration()): SchemaRDD = { | ||
| // new SchemaRDD( | ||
| // this, | ||
| // OrcRelation.createEmpty(path, ScalaReflection.attributesFor[A], allowExisting, conf, this)) | ||
| // } | ||
|
|
||
| /** | ||
| * Analyzes the given table in the current database to generate statistics, which will be | ||
| * used in query optimizations. | ||
|
|
@@ -334,6 +402,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { | |
| HiveCommandStrategy(self), | ||
| TakeOrdered, | ||
| ParquetOperations, | ||
| OrcOperations, | ||
| InMemoryScans, | ||
| ParquetConversion, // Must be before HiveTableScans | ||
| HiveTableScans, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert this file