@@ -186,6 +186,100 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
186186 TestSQLContext .setConf(SQLConf .PARQUET_BINARY_AS_STRING , oldIsParquetBinaryAsString.toString)
187187 }
188188
189+ test(" Compression options for writing to a Parquetfile" ) {
190+ val defaultParquetCompressionCodec = TestSQLContext .parquetCompressionCodec
191+ import scala .collection .JavaConversions ._
192+
193+ val file = getTempFilePath(" parquet" )
194+ val path = file.toString
195+ val rdd = TestSQLContext .sparkContext.parallelize((1 to 100 ))
196+ .map(i => TestRDDEntry (i, s " val_ $i" ))
197+
198+ // test default compression codec
199+ rdd.saveAsParquetFile(path)
200+ var actualCodec = ParquetTypesConverter .readMetaData(new Path (path), Some (TestSQLContext .sparkContext.hadoopConfiguration))
201+ .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
202+ assert(actualCodec === TestSQLContext .parquetCompressionCodec.toUpperCase :: Nil )
203+
204+ parquetFile(path).registerTempTable(" tmp" )
205+ checkAnswer(
206+ sql(" SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'" ),
207+ (5 , " val_5" ) ::
208+ (7 , " val_7" ) :: Nil )
209+
210+ Utils .deleteRecursively(file)
211+
212+ // test uncompressed parquet file with property value "UNCOMPRESSED"
213+ TestSQLContext .setConf(SQLConf .PARQUET_COMPRESSION , " UNCOMPRESSED" )
214+
215+ rdd.saveAsParquetFile(path)
216+ actualCodec = ParquetTypesConverter .readMetaData(new Path (path), Some (TestSQLContext .sparkContext.hadoopConfiguration))
217+ .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
218+ assert(actualCodec === TestSQLContext .parquetCompressionCodec.toUpperCase :: Nil )
219+
220+ parquetFile(path).registerTempTable(" tmp" )
221+ checkAnswer(
222+ sql(" SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'" ),
223+ (5 , " val_5" ) ::
224+ (7 , " val_7" ) :: Nil )
225+
226+ Utils .deleteRecursively(file)
227+
228+ // test uncompressed parquet file with property value "none"
229+ TestSQLContext .setConf(SQLConf .PARQUET_COMPRESSION , " none" )
230+
231+ rdd.saveAsParquetFile(path)
232+ actualCodec = ParquetTypesConverter .readMetaData(new Path (path), Some (TestSQLContext .sparkContext.hadoopConfiguration))
233+ .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
234+ assert(actualCodec === " UNCOMPRESSED" :: Nil )
235+
236+ parquetFile(path).registerTempTable(" tmp" )
237+ checkAnswer(
238+ sql(" SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'" ),
239+ (5 , " val_5" ) ::
240+ (7 , " val_7" ) :: Nil )
241+
242+ Utils .deleteRecursively(file)
243+
244+ // test gzip compression codec
245+ TestSQLContext .setConf(SQLConf .PARQUET_COMPRESSION , " gzip" )
246+
247+ rdd.saveAsParquetFile(path)
248+ actualCodec = ParquetTypesConverter .readMetaData(new Path (path), Some (TestSQLContext .sparkContext.hadoopConfiguration))
249+ .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
250+ assert(actualCodec === TestSQLContext .parquetCompressionCodec.toUpperCase :: Nil )
251+
252+ parquetFile(path).registerTempTable(" tmp" )
253+ checkAnswer(
254+ sql(" SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'" ),
255+ (5 , " val_5" ) ::
256+ (7 , " val_7" ) :: Nil )
257+
258+ Utils .deleteRecursively(file)
259+
260+ // test snappy compression codec
261+ TestSQLContext .setConf(SQLConf .PARQUET_COMPRESSION , " snappy" )
262+
263+ rdd.saveAsParquetFile(path)
264+ actualCodec = ParquetTypesConverter .readMetaData(new Path (path), Some (TestSQLContext .sparkContext.hadoopConfiguration))
265+ .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
266+ assert(actualCodec === TestSQLContext .parquetCompressionCodec.toUpperCase :: Nil )
267+
268+ parquetFile(path).registerTempTable(" tmp" )
269+ checkAnswer(
270+ sql(" SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'" ),
271+ (5 , " val_5" ) ::
272+ (7 , " val_7" ) :: Nil )
273+
274+ Utils .deleteRecursively(file)
275+
276+ // TODO: Lzo requires additional external setup steps so leave it out for now
277+ // ref.: https://github.com/Parquet/parquet-mr/blob/parquet-1.5.0/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java#L169
278+
279+ // Set it back.
280+ TestSQLContext .setConf(SQLConf .PARQUET_COMPRESSION , defaultParquetCompressionCodec)
281+ }
282+
189283 test(" Read/Write All Types with non-primitive type" ) {
190284 val tempDir = getTempFilePath(" parquetTest" ).getCanonicalPath
191285 val range = (0 to 255 )
0 commit comments