From 669f49262c0fc11cceebb02b0b5618eac9c08f8e Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 4 May 2018 12:03:11 -0700 Subject: [PATCH] Return InternalRow from ParquetFileFormat. --- .../org/apache/spark/sql/execution/SparkPlan.scala | 7 ++++++- .../datasources/parquet/ParquetFileFormat.scala | 11 ++++++----- .../datasources/parquet/ParquetReadSupport.scala | 10 +++++----- .../parquet/ParquetRecordMaterializer.scala | 6 +++--- .../datasources/parquet/ParquetRowConverter.scala | 3 ++- 5 files changed, 22 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 398758a3331b4..4577d320c7c7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -250,8 +250,13 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ val codec = CompressionCodec.createCodec(SparkEnv.get.conf) val bos = new ByteArrayOutputStream() val out = new DataOutputStream(codec.compressedOutputStream(bos)) + lazy val makeUnsafeRow = UnsafeProjection.create(schema) while (iter.hasNext && (n < 0 || count < n)) { - val row = iter.next().asInstanceOf[UnsafeRow] + val row = iter.next() match { + case unsafe: UnsafeRow => unsafe + case internal: InternalRow => makeUnsafeRow(internal) + case o => throw new UnsupportedOperationException(s"Cannot collect non-row object: $o") + } out.writeInt(row.getSizeInBytes) row.writeToStream(out, buffer) count += 1 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index d1f9e11ed4225..ab1de7c3935f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -417,9 +417,9 @@ class ParquetFileFormat // ParquetRecordReader returns UnsafeRow val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) - new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), parquetFilter) + new ParquetRecordReader[InternalRow](new ParquetReadSupport(convertTz), parquetFilter) } else { - new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz)) + new ParquetRecordReader[InternalRow](new ParquetReadSupport(convertTz)) } val iter = new RecordReaderIterator(reader) // SPARK-23457 Register a task completion lister before `initialization`. @@ -435,10 +435,11 @@ class ParquetFileFormat // Object, then we can defer the cast until later! if (partitionSchema.length == 0) { // There is no partition columns - iter.asInstanceOf[Iterator[InternalRow]] + iter } else { - iter.asInstanceOf[Iterator[InternalRow]] - .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues))) + iter.map(d => joinedRow(d, file.partitionValues)) +// iter.asInstanceOf[Iterator[InternalRow]] +// .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues))) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index 40ce5d5e0564e..5d9683245c70d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -29,12 +29,12 @@ import org.apache.parquet.schema._ import org.apache.parquet.schema.Type.Repetition import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types._ /** * A Parquet [[ReadSupport]] implementation for reading Parquet records as Catalyst - * [[UnsafeRow]]s. + * [[InternalRow]]s. * * The API interface of [[ReadSupport]] is a little bit over complicated because of historical * reasons. In older versions of parquet-mr (say 1.6.0rc3 and prior), [[ReadSupport]] need to be @@ -49,7 +49,7 @@ import org.apache.spark.sql.types._ * to [[prepareForRead()]], but use a private `var` for simplicity. */ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) - extends ReadSupport[UnsafeRow] with Logging { + extends ReadSupport[InternalRow] with Logging { private var catalystRequestedSchema: StructType = _ def this() { @@ -80,13 +80,13 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) /** * Called on executor side after [[init()]], before instantiating actual Parquet record readers. * Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet - * records to Catalyst [[UnsafeRow]]s. + * records to Catalyst [[InternalRow]]s. */ override def prepareForRead( conf: Configuration, keyValueMetaData: JMap[String, String], fileSchema: MessageType, - readContext: ReadContext): RecordMaterializer[UnsafeRow] = { + readContext: ReadContext): RecordMaterializer[InternalRow] = { log.debug(s"Preparing for read Parquet file with message type: $fileSchema") val parquetRequestedSchema = readContext.getRequestedSchema diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala index b2459dd0e8bba..3098a332d3027 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala @@ -22,7 +22,7 @@ import java.util.TimeZone import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer} import org.apache.parquet.schema.MessageType -import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.StructType /** @@ -37,12 +37,12 @@ private[parquet] class ParquetRecordMaterializer( catalystSchema: StructType, schemaConverter: ParquetToSparkSchemaConverter, convertTz: Option[TimeZone]) - extends RecordMaterializer[UnsafeRow] { + extends RecordMaterializer[InternalRow] { private val rootConverter = new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, convertTz, NoopUpdater) - override def getCurrentRecord: UnsafeRow = rootConverter.currentRecord + override def getCurrentRecord: InternalRow = rootConverter.currentRecord override def getRootConverter: GroupConverter = rootConverter } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 1199725941842..523f722c00534 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -178,7 +178,8 @@ private[parquet] class ParquetRowConverter( /** * The [[UnsafeRow]] converted from an entire Parquet record. */ - def currentRecord: UnsafeRow = unsafeProjection(currentRow) + def currentRecord: InternalRow = currentRow + // def currentRecord: InternalRow = unsafeProjection(currentRow) // Converters for each field. private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {