@@ -19,17 +19,18 @@ package org.apache.spark.sql.execution.datasources.parquet
1919
2020import java .util .{Map => JMap }
2121
22- import scala .collection .JavaConversions .{ iterableAsScalaIterable , mapAsJavaMap , mapAsScalaMap }
22+ import scala .collection .JavaConverters . _
2323
2424import org .apache .hadoop .conf .Configuration
2525import org .apache .parquet .hadoop .api .ReadSupport .ReadContext
2626import org .apache .parquet .hadoop .api .{InitContext , ReadSupport }
2727import org .apache .parquet .io .api .RecordMaterializer
28- import org .apache .parquet .schema .MessageType
28+ import org .apache .parquet .schema .Type .Repetition
29+ import org .apache .parquet .schema ._
2930
3031import org .apache .spark .Logging
3132import org .apache .spark .sql .catalyst .InternalRow
32- import org .apache .spark .sql .types .StructType
33+ import org .apache .spark .sql .types ._
3334
3435private [parquet] class CatalystReadSupport extends ReadSupport [InternalRow ] with Logging {
3536 // Called after `init()` when initializing Parquet record reader.
@@ -44,7 +45,7 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
4445 val parquetRequestedSchema = readContext.getRequestedSchema
4546
4647 val catalystRequestedSchema =
47- Option (readContext.getReadSupportMetadata).map(_.toMap ).flatMap { metadata =>
48+ Option (readContext.getReadSupportMetadata).map(_.asScala ).flatMap { metadata =>
4849 metadata
4950 // First tries to read requested schema, which may result from projections
5051 .get(CatalystReadSupport .SPARK_ROW_REQUESTED_SCHEMA )
@@ -81,83 +82,191 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
8182 // `StructType` containing all requested columns.
8283 val maybeRequestedSchema = Option (conf.get(CatalystReadSupport .SPARK_ROW_REQUESTED_SCHEMA ))
8384
84- // Below we construct a Parquet schema containing all requested columns. This schema tells
85- // Parquet which columns to read.
86- //
87- // If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet schema. Otherwise,
88- // we have to fallback to the full file schema which contains all columns in the file.
89- // Obviously this may waste IO bandwidth since it may read more columns than requested.
90- //
91- // Two things to note:
92- //
93- // 1. It's possible that some requested columns don't exist in the target Parquet file. For
94- // example, in the case of schema merging, the globally merged schema may contain extra
95- // columns gathered from other Parquet files. These columns will be simply filled with nulls
96- // when actually reading the target Parquet file.
97- //
98- // 2. When `maybeRequestedSchema` is available, we can't simply convert the Catalyst schema to
99- // Parquet schema using `CatalystSchemaConverter`, because the mapping is not unique due to
100- // non-standard behaviors of some Parquet libraries/tools. For example, a Parquet file
101- // containing a single integer array field `f1` may have the following legacy 2-level
102- // structure:
103- //
104- // message root {
105- // optional group f1 (LIST) {
106- // required INT32 element;
107- // }
108- // }
109- //
110- // while `CatalystSchemaConverter` may generate a standard 3-level structure:
111- //
112- // message root {
113- // optional group f1 (LIST) {
114- // repeated group list {
115- // required INT32 element;
116- // }
117- // }
118- // }
119- //
120- // Apparently, we can't use the 2nd schema to read the target Parquet file as they have
121- // different physical structures.
12285 val parquetRequestedSchema =
12386 maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
124- val toParquet = new CatalystSchemaConverter (conf)
125- val fileSchema = context.getFileSchema.asGroupType()
126- val fileFieldNames = fileSchema.getFields.map(_.getName).toSet
127-
128- StructType
129- // Deserializes the Catalyst schema of requested columns
130- .fromString(schemaString)
131- .map { field =>
132- if (fileFieldNames.contains(field.name)) {
133- // If the field exists in the target Parquet file, extracts the field type from the
134- // full file schema and makes a single-field Parquet schema
135- new MessageType (" root" , fileSchema.getType(field.name))
136- } else {
137- // Otherwise, just resorts to `CatalystSchemaConverter`
138- toParquet.convert(StructType (Array (field)))
139- }
140- }
141- // Merges all single-field Parquet schemas to form a complete schema for all requested
142- // columns. Note that it's possible that no columns are requested at all (e.g., count
143- // some partition column of a partitioned Parquet table). That's why `fold` is used here
144- // and always fallback to an empty Parquet schema.
145- .fold(new MessageType (" root" )) {
146- _ union _
147- }
87+ val catalystRequestedSchema = StructType .fromString(schemaString)
88+ CatalystReadSupport .clipParquetSchema(context.getFileSchema, catalystRequestedSchema)
14889 }
14990
15091 val metadata =
15192 Map .empty[String , String ] ++
15293 maybeRequestedSchema.map(CatalystReadSupport .SPARK_ROW_REQUESTED_SCHEMA -> _) ++
15394 maybeRowSchema.map(RowWriteSupport .SPARK_ROW_SCHEMA -> _)
15495
155- new ReadContext (parquetRequestedSchema, metadata)
96+ new ReadContext (parquetRequestedSchema, metadata.asJava )
15697 }
15798}
15899
159100private [parquet] object CatalystReadSupport {
160101 val SPARK_ROW_REQUESTED_SCHEMA = " org.apache.spark.sql.parquet.row.requested_schema"
161102
162103 val SPARK_METADATA_KEY = " org.apache.spark.sql.parquet.row.metadata"
104+
105+ /**
106+ * Tailors `parquetSchema` according to `catalystSchema` by removing column paths don't exist
107+ * in `catalystSchema`, and adding those only exist in `catalystSchema`.
108+ */
109+ def clipParquetSchema (parquetSchema : MessageType , catalystSchema : StructType ): MessageType = {
110+ val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
111+ Types .buildMessage().addFields(clippedParquetFields : _* ).named(" root" )
112+ }
113+
114+ private def clipParquetType (parquetType : Type , catalystType : DataType ): Type = {
115+ catalystType match {
116+ case t : ArrayType if ! isPrimitiveCatalystType(t.elementType) =>
117+ // Only clips array types with nested type as element type.
118+ clipParquetListType(parquetType.asGroupType(), t.elementType)
119+
120+ case t : MapType
121+ if ! isPrimitiveCatalystType(t.keyType) ||
122+ ! isPrimitiveCatalystType(t.valueType) =>
123+ // Only clips map types with nested key type or value type
124+ clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType)
125+
126+ case t : StructType =>
127+ clipParquetGroup(parquetType.asGroupType(), t)
128+
129+ case _ =>
130+ // UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able
131+ // to be mapped to desired user-space types. So UDTs shouldn't participate schema merging.
132+ parquetType
133+ }
134+ }
135+
136+ /**
137+ * Whether a Catalyst [[DataType ]] is primitive. Primitive [[DataType ]] is not equivalent to
138+ * [[AtomicType ]]. For example, [[CalendarIntervalType ]] is primitive, but it's not an
139+ * [[AtomicType ]].
140+ */
141+ private def isPrimitiveCatalystType (dataType : DataType ): Boolean = {
142+ dataType match {
143+ case _ : ArrayType | _ : MapType | _ : StructType => false
144+ case _ => true
145+ }
146+ }
147+
148+ /**
149+ * Clips a Parquet [[GroupType ]] which corresponds to a Catalyst [[ArrayType ]]. The element type
150+ * of the [[ArrayType ]] should also be a nested type, namely an [[ArrayType ]], a [[MapType ]], or a
151+ * [[StructType ]].
152+ */
153+ private def clipParquetListType (parquetList : GroupType , elementType : DataType ): Type = {
154+ // Precondition of this method, should only be called for lists with nested element types.
155+ assert(! isPrimitiveCatalystType(elementType))
156+
157+ // Unannotated repeated group should be interpreted as required list of required element, so
158+ // list element type is just the group itself. Clip it.
159+ if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition .REPEATED )) {
160+ clipParquetType(parquetList, elementType)
161+ } else {
162+ assert(
163+ parquetList.getOriginalType == OriginalType .LIST ,
164+ " Invalid Parquet schema. " +
165+ " Original type of annotated Parquet lists must be LIST: " +
166+ parquetList.toString)
167+
168+ assert(
169+ parquetList.getFieldCount == 1 && parquetList.getType(0 ).isRepetition(Repetition .REPEATED ),
170+ " Invalid Parquet schema. " +
171+ " LIST-annotated group should only have exactly one repeated field: " +
172+ parquetList)
173+
174+ // Precondition of this method, should only be called for lists with nested element types.
175+ assert(! parquetList.getType(0 ).isPrimitive)
176+
177+ val repeatedGroup = parquetList.getType(0 ).asGroupType()
178+
179+ // If the repeated field is a group with multiple fields, or the repeated field is a group
180+ // with one field and is named either "array" or uses the LIST-annotated group's name with
181+ // "_tuple" appended then the repeated type is the element type and elements are required.
182+ // Build a new LIST-annotated group with clipped `repeatedGroup` as element type and the
183+ // only field.
184+ if (
185+ repeatedGroup.getFieldCount > 1 ||
186+ repeatedGroup.getName == " array" ||
187+ repeatedGroup.getName == parquetList.getName + " _tuple"
188+ ) {
189+ Types
190+ .buildGroup(parquetList.getRepetition)
191+ .as(OriginalType .LIST )
192+ .addField(clipParquetType(repeatedGroup, elementType))
193+ .named(parquetList.getName)
194+ } else {
195+ // Otherwise, the repeated field's type is the element type with the repeated field's
196+ // repetition.
197+ Types
198+ .buildGroup(parquetList.getRepetition)
199+ .as(OriginalType .LIST )
200+ .addField(
201+ Types
202+ .repeatedGroup()
203+ .addField(clipParquetType(repeatedGroup.getType(0 ), elementType))
204+ .named(repeatedGroup.getName))
205+ .named(parquetList.getName)
206+ }
207+ }
208+ }
209+
210+ /**
211+ * Clips a Parquet [[GroupType ]] which corresponds to a Catalyst [[MapType ]]. Either key type or
212+ * value type of the [[MapType ]] must be a nested type, namely an [[ArrayType ]], a [[MapType ]], or
213+ * a [[StructType ]].
214+ */
215+ private def clipParquetMapType (
216+ parquetMap : GroupType , keyType : DataType , valueType : DataType ): GroupType = {
217+ // Precondition of this method, only handles maps with nested key types or value types.
218+ assert(! isPrimitiveCatalystType(keyType) || ! isPrimitiveCatalystType(valueType))
219+
220+ val repeatedGroup = parquetMap.getType(0 ).asGroupType()
221+ val parquetKeyType = repeatedGroup.getType(0 )
222+ val parquetValueType = repeatedGroup.getType(1 )
223+
224+ val clippedRepeatedGroup =
225+ Types
226+ .repeatedGroup()
227+ .as(repeatedGroup.getOriginalType)
228+ .addField(clipParquetType(parquetKeyType, keyType))
229+ .addField(clipParquetType(parquetValueType, valueType))
230+ .named(repeatedGroup.getName)
231+
232+ Types
233+ .buildGroup(parquetMap.getRepetition)
234+ .as(parquetMap.getOriginalType)
235+ .addField(clippedRepeatedGroup)
236+ .named(parquetMap.getName)
237+ }
238+
239+ /**
240+ * Clips a Parquet [[GroupType ]] which corresponds to a Catalyst [[StructType ]].
241+ *
242+ * @return A clipped [[GroupType ]], which has at least one field.
243+ * @note Parquet doesn't allow creating empty [[GroupType ]] instances except for empty
244+ * [[MessageType ]]. Because it's legal to construct an empty requested schema for column
245+ * pruning.
246+ */
247+ private def clipParquetGroup (parquetRecord : GroupType , structType : StructType ): GroupType = {
248+ val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType)
249+ Types
250+ .buildGroup(parquetRecord.getRepetition)
251+ .as(parquetRecord.getOriginalType)
252+ .addFields(clippedParquetFields : _* )
253+ .named(parquetRecord.getName)
254+ }
255+
256+ /**
257+ * Clips a Parquet [[GroupType ]] which corresponds to a Catalyst [[StructType ]].
258+ *
259+ * @return A list of clipped [[GroupType ]] fields, which can be empty.
260+ */
261+ private def clipParquetGroupFields (
262+ parquetRecord : GroupType , structType : StructType ): Seq [Type ] = {
263+ val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
264+ val toParquet = new CatalystSchemaConverter (followParquetFormatSpec = true )
265+ structType.map { f =>
266+ parquetFieldMap
267+ .get(f.name)
268+ .map(clipParquetType(_, f.dataType))
269+ .getOrElse(toParquet.convertField(f))
270+ }
271+ }
163272}
0 commit comments