Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ private[parquet] object ParquetTypesConverter extends Logging {
def isPrimitiveType(ctype: DataType): Boolean =
classOf[PrimitiveType] isAssignableFrom ctype.getClass

private var allowBinaryType: Boolean = true

def toPrimitiveDataType(parquetType: ParquetPrimitiveType): DataType =
parquetType.getPrimitiveTypeName match {
case ParquetPrimitiveTypeName.BINARY
if parquetType.getOriginalType == ParquetOriginalType.UTF8 => StringType
if (parquetType.getOriginalType == ParquetOriginalType.UTF8 || !allowBinaryType) => StringType
case ParquetPrimitiveTypeName.BINARY => BinaryType
case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
case ParquetPrimitiveTypeName.DOUBLE => DoubleType
Expand Down Expand Up @@ -369,9 +371,12 @@ private[parquet] object ParquetTypesConverter extends Logging {
}
ParquetRelation.enableLogForwarding()

val children = fs.listStatus(path).filterNot {
_.getPath.getName == FileOutputCommitter.SUCCEEDED_FILE_NAME
}
allowBinaryType = conf.getBoolean("parquet.binarytype", true)

val children = fs.listStatus(path).filterNot(
status => (status.getPath.getName.charAt(0) == '.' ||
status.getPath.getName == FileOutputCommitter.SUCCEEDED_FILE_NAME)
)

// NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row
// groups. Since Parquet schema is replicated among all row groups, we only need to touch a
Expand Down Expand Up @@ -406,8 +411,12 @@ private[parquet] object ParquetTypesConverter extends Logging {
if (keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY) != null) {
convertFromString(keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY))
} else {
val attributes = convertToAttributes(
readMetaData(origPath, conf).getFileMetaData.getSchema)
val fileMetaData = readMetaData(origPath, conf).getFileMetaData
if (fileMetaData.getCreatedBy.contains("impala")) {
allowBinaryType = false
log.info(s"Impala parquet file founded, BinaryType disabled")
}
val attributes = convertToAttributes(fileMetaData.getSchema)
log.info(s"Falling back to schema conversion from Parquet types; result: $attributes")
attributes
}
Expand Down