Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,22 @@ private[sql] class PostgresQuirks extends DriverQuirks {
StringType
} else if (sqlType == Types.OTHER && typeName.equals("inet")) {
StringType
} else if (sqlType == Types.OTHER && typeName.equals("uuid")) {
StringType
} else if (sqlType == Types.OTHER && typeName.equals("hstore")) {
MapType(keyType = StringType, valueType = StringType)
} else if (sqlType == Types.ARRAY) {
ArrayType(elementType = typeName match {
case "_varchar" => StringType
case "_text" => StringType
case "_name" => StringType
case "_int2" => IntegerType
case "_int4" => IntegerType
case "_int8" => LongType
case "_float4" => FloatType
case "_float8" => DoubleType
case _ => StringType
})
} else null
}

Expand Down
24 changes: 24 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.apache.spark.sql.catalyst.expressions.{Row, SpecificMutableRow}
import org.apache.spark.sql.types._
import org.apache.spark.sql.sources._

import scala.collection.JavaConverters._

private[sql] object JDBCRDD extends Logging {
/**
* Maps a JDBC type to a Catalyst type. This function is called only when
Expand Down Expand Up @@ -281,6 +283,8 @@ private[sql] class JDBCRDD(
case object StringConversion extends JDBCConversion
case object TimestampConversion extends JDBCConversion
case object BinaryConversion extends JDBCConversion
case object ArrayConversion extends JDBCConversion
case object MapConversion extends JDBCConversion

/**
* Maps a StructType to a type tag list.
Expand All @@ -298,6 +302,8 @@ private[sql] class JDBCRDD(
case StringType => StringConversion
case TimestampType => TimestampConversion
case BinaryType => BinaryConversion
case ArrayType(_,_) => ArrayConversion
case MapType(_,_,_) => MapConversion
case _ => throw new IllegalArgumentException(s"Unsupported field $sf")
}).toArray
}
Expand Down Expand Up @@ -356,6 +362,24 @@ private[sql] class JDBCRDD(
}
mutableRow.setLong(i, ans)
}
case ArrayConversion => {
val sqlArray = rs.getArray(pos)
val array = if (sqlArray == null) {
null
} else {
sqlArray.getArray.asInstanceOf[Array[_]].toSeq
}
mutableRow.update(i, array)
}
case MapConversion => {
val sqlMap = rs.getObject(pos)
val map = if (sqlMap == null) {
null
} else {
sqlMap.asInstanceOf[java.util.Map[String,String]].asScala
}
mutableRow.update(i, map)
}
}
if (rs.wasNull) mutableRow.setNullAt(i)
i = i + 1
Expand Down