Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,13 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
val bos = new ByteArrayOutputStream()
val out = new DataOutputStream(codec.compressedOutputStream(bos))
lazy val makeUnsafeRow = UnsafeProjection.create(schema)
while (iter.hasNext && (n < 0 || count < n)) {
val row = iter.next().asInstanceOf[UnsafeRow]
val row = iter.next() match {
case unsafe: UnsafeRow => unsafe
case internal: InternalRow => makeUnsafeRow(internal)
case o => throw new UnsupportedOperationException(s"Cannot collect non-row object: $o")
}
out.writeInt(row.getSizeInBytes)
row.writeToStream(out, buffer)
count += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,9 @@ class ParquetFileFormat
// ParquetRecordReader returns UnsafeRow
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), parquetFilter)
new ParquetRecordReader[InternalRow](new ParquetReadSupport(convertTz), parquetFilter)
} else {
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
new ParquetRecordReader[InternalRow](new ParquetReadSupport(convertTz))
}
val iter = new RecordReaderIterator(reader)
// SPARK-23457 Register a task completion lister before `initialization`.
Expand All @@ -435,10 +435,11 @@ class ParquetFileFormat
// Object, then we can defer the cast until later!
if (partitionSchema.length == 0) {
// There is no partition columns
iter.asInstanceOf[Iterator[InternalRow]]
iter
} else {
iter.asInstanceOf[Iterator[InternalRow]]
.map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
iter.map(d => joinedRow(d, file.partitionValues))
// iter.asInstanceOf[Iterator[InternalRow]]
// .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import org.apache.parquet.schema._
import org.apache.parquet.schema.Type.Repetition

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._

/**
* A Parquet [[ReadSupport]] implementation for reading Parquet records as Catalyst
* [[UnsafeRow]]s.
* [[InternalRow]]s.
*
* The API interface of [[ReadSupport]] is a little bit over complicated because of historical
* reasons. In older versions of parquet-mr (say 1.6.0rc3 and prior), [[ReadSupport]] need to be
Expand All @@ -49,7 +49,7 @@ import org.apache.spark.sql.types._
* to [[prepareForRead()]], but use a private `var` for simplicity.
*/
private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone])
extends ReadSupport[UnsafeRow] with Logging {
extends ReadSupport[InternalRow] with Logging {
private var catalystRequestedSchema: StructType = _

def this() {
Expand Down Expand Up @@ -80,13 +80,13 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone])
/**
* Called on executor side after [[init()]], before instantiating actual Parquet record readers.
* Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet
* records to Catalyst [[UnsafeRow]]s.
* records to Catalyst [[InternalRow]]s.
*/
override def prepareForRead(
conf: Configuration,
keyValueMetaData: JMap[String, String],
fileSchema: MessageType,
readContext: ReadContext): RecordMaterializer[UnsafeRow] = {
readContext: ReadContext): RecordMaterializer[InternalRow] = {
log.debug(s"Preparing for read Parquet file with message type: $fileSchema")
val parquetRequestedSchema = readContext.getRequestedSchema

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.TimeZone
import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
import org.apache.parquet.schema.MessageType

import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType

/**
Expand All @@ -37,12 +37,12 @@ private[parquet] class ParquetRecordMaterializer(
catalystSchema: StructType,
schemaConverter: ParquetToSparkSchemaConverter,
convertTz: Option[TimeZone])
extends RecordMaterializer[UnsafeRow] {
extends RecordMaterializer[InternalRow] {

private val rootConverter =
new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, convertTz, NoopUpdater)

override def getCurrentRecord: UnsafeRow = rootConverter.currentRecord
override def getCurrentRecord: InternalRow = rootConverter.currentRecord

override def getRootConverter: GroupConverter = rootConverter
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ private[parquet] class ParquetRowConverter(
/**
* The [[UnsafeRow]] converted from an entire Parquet record.
*/
def currentRecord: UnsafeRow = unsafeProjection(currentRow)
def currentRecord: InternalRow = currentRow
// def currentRecord: InternalRow = unsafeProjection(currentRow)

// Converters for each field.
private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
Expand Down