Skip to content

Commit 6822712

Browse files
author
Nathan Howell
committed
Split up JsonRDD2 into multiple objects
1 parent fa8234f commit 6822712

File tree

9 files changed

+506
-482
lines changed

9 files changed

+506
-482
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
4040
import org.apache.spark.sql.catalyst.plans.logical._
4141
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD}
4242
import org.apache.spark.sql.jdbc.JDBCWriteDetails
43-
import org.apache.spark.sql.json.JsonRDD
43+
import org.apache.spark.sql.json.{JacksonGenerator, JsonRDD}
4444
import org.apache.spark.sql.types._
4545
import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect}
4646
import org.apache.spark.util.Utils
@@ -1369,7 +1369,7 @@ class DataFrame private[sql](
13691369
new Iterator[String] {
13701370
override def hasNext: Boolean = iter.hasNext
13711371
override def next(): String = {
1372-
JsonRDD.rowToJSON(rowSchema, gen)(iter.next())
1372+
JacksonGenerator(rowSchema, gen)(iter.next())
13731373
gen.flush()
13741374

13751375
val json = writer.toString
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.json
19+
20+
import com.fasterxml.jackson.core._
21+
22+
import org.apache.spark.rdd.RDD
23+
import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
24+
import org.apache.spark.sql.json.JacksonUtils.nextUntil
25+
import org.apache.spark.sql.types._
26+
27+
private[sql] object InferSchema {
28+
/**
29+
* Infer the type of a collection of json records in three stages:
30+
* 1. Infer the type of each record
31+
* 2. Merge types by choosing the lowest type necessary to cover equal keys
32+
* 3. Replace any remaining null fields with string, the top type
33+
*/
34+
def apply(
35+
json: RDD[String],
36+
samplingRatio: Double = 1.0,
37+
columnNameOfCorruptRecords: String): StructType = {
38+
require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0")
39+
val schemaData = if (samplingRatio > 0.99) {
40+
json
41+
} else {
42+
json.sample(withReplacement = false, samplingRatio, 1)
43+
}
44+
45+
// perform schema inference on each row and merge afterwards
46+
schemaData.mapPartitions { iter =>
47+
val factory = new JsonFactory()
48+
iter.map { row =>
49+
try {
50+
val parser = factory.createParser(row)
51+
parser.nextToken()
52+
inferField(parser)
53+
} catch {
54+
case _: JsonParseException =>
55+
StructType(Seq(StructField(columnNameOfCorruptRecords, StringType)))
56+
}
57+
}
58+
}.treeAggregate[DataType](StructType(Seq()))(compatibleRootType, compatibleRootType) match {
59+
case st: StructType => nullTypeToStringType(st)
60+
}
61+
}
62+
63+
/**
64+
* Infer the type of a json document from the parser's token stream
65+
*/
66+
private def inferField(parser: JsonParser): DataType = {
67+
import com.fasterxml.jackson.core.JsonToken._
68+
parser.getCurrentToken match {
69+
case null | VALUE_NULL => NullType
70+
71+
case FIELD_NAME =>
72+
parser.nextToken()
73+
inferField(parser)
74+
75+
case VALUE_STRING if parser.getTextLength < 1 =>
76+
// Zero length strings and nulls have special handling to deal
77+
// with JSON generators that do not distinguish between the two.
78+
// To accurately infer types for empty strings that are really
79+
// meant to represent nulls we assume that the two are isomorphic
80+
// but will defer treating null fields as strings until all the
81+
// record fields' types have been combined.
82+
NullType
83+
84+
case VALUE_STRING => StringType
85+
case START_OBJECT =>
86+
val builder = Seq.newBuilder[StructField]
87+
while (nextUntil(parser, END_OBJECT)) {
88+
builder += StructField(parser.getCurrentName, inferField(parser), nullable = true)
89+
}
90+
91+
StructType(builder.result().sortBy(_.name))
92+
93+
case START_ARRAY =>
94+
// If this JSON array is empty, we use NullType as a placeholder.
95+
// If this array is not empty in other JSON objects, we can resolve
96+
// the type as we pass through all JSON objects.
97+
var elementType: DataType = NullType
98+
while (nextUntil(parser, END_ARRAY)) {
99+
elementType = compatibleType(elementType, inferField(parser))
100+
}
101+
102+
ArrayType(elementType)
103+
104+
case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
105+
import JsonParser.NumberType._
106+
parser.getNumberType match {
107+
// For Integer values, use LongType by default.
108+
case INT | LONG => LongType
109+
// Since we do not have a data type backed by BigInteger,
110+
// when we see a Java BigInteger, we use DecimalType.
111+
case BIG_INTEGER | BIG_DECIMAL => DecimalType.Unlimited
112+
case FLOAT | DOUBLE => DoubleType
113+
}
114+
115+
case VALUE_TRUE | VALUE_FALSE => BooleanType
116+
}
117+
}
118+
119+
private def nullTypeToStringType(struct: StructType): StructType = {
120+
val fields = struct.fields.map {
121+
case StructField(fieldName, dataType, nullable, _) =>
122+
val newType = dataType match {
123+
case NullType => StringType
124+
case ArrayType(NullType, containsNull) => ArrayType(StringType, containsNull)
125+
case ArrayType(struct: StructType, containsNull) =>
126+
ArrayType(nullTypeToStringType(struct), containsNull)
127+
case struct: StructType =>nullTypeToStringType(struct)
128+
case other: DataType => other
129+
}
130+
131+
StructField(fieldName, newType, nullable)
132+
}
133+
134+
StructType(fields)
135+
}
136+
137+
/**
138+
* Remove top-level ArrayType wrappers and merge the remaining schemas
139+
*/
140+
private def compatibleRootType: (DataType, DataType) => DataType = {
141+
case (ArrayType(ty1, _), ty2) => compatibleRootType(ty1, ty2)
142+
case (ty1, ArrayType(ty2, _)) => compatibleRootType(ty1, ty2)
143+
case (ty1, ty2) => compatibleType(ty1, ty2)
144+
}
145+
146+
/**
147+
* Returns the most general data type for two given data types.
148+
*/
149+
private[json] def compatibleType(t1: DataType, t2: DataType): DataType = {
150+
HiveTypeCoercion.findTightestCommonType(t1, t2).getOrElse {
151+
// t1 or t2 is a StructType, ArrayType, or an unexpected type.
152+
(t1, t2) match {
153+
case (other: DataType, NullType) => other
154+
case (NullType, other: DataType) => other
155+
case (StructType(fields1), StructType(fields2)) =>
156+
val newFields = (fields1 ++ fields2).groupBy(field => field.name).map {
157+
case (name, fieldTypes) =>
158+
val dataType = fieldTypes.view.map(_.dataType).reduce(compatibleType)
159+
StructField(name, dataType, nullable = true)
160+
}
161+
StructType(newFields.toSeq.sortBy(_.name))
162+
163+
case (ArrayType(elementType1, containsNull1), ArrayType(elementType2, containsNull2)) =>
164+
ArrayType(compatibleType(elementType1, elementType2), containsNull1 || containsNull2)
165+
166+
// strings and every string is a Json object.
167+
case (_, _) => StringType
168+
}
169+
}
170+
}
171+
}

sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,10 @@ private[sql] class JSONRelation(
131131

132132
override lazy val schema = userSpecifiedSchema.getOrElse {
133133
if (useJacksonStreamingAPI) {
134-
JsonRDD2.nullTypeToStringType(
135-
JsonRDD2.inferSchema(
136-
baseRDD,
137-
samplingRatio,
138-
sqlContext.conf.columnNameOfCorruptRecord))
134+
InferSchema(
135+
baseRDD,
136+
samplingRatio,
137+
sqlContext.conf.columnNameOfCorruptRecord)
139138
} else {
140139
JsonRDD.nullTypeToStringType(
141140
JsonRDD.inferSchema(
@@ -147,7 +146,7 @@ private[sql] class JSONRelation(
147146

148147
override def buildScan(): RDD[Row] = {
149148
if (useJacksonStreamingAPI) {
150-
JsonRDD2.jsonStringToRow(
149+
JacksonParser(
151150
baseRDD,
152151
schema,
153152
sqlContext.conf.columnNameOfCorruptRecord)
@@ -161,7 +160,7 @@ private[sql] class JSONRelation(
161160

162161
override def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] = {
163162
if (useJacksonStreamingAPI) {
164-
JsonRDD2.jsonStringToRow(
163+
JacksonParser(
165164
baseRDD,
166165
StructType.fromAttributes(requiredColumns),
167166
sqlContext.conf.columnNameOfCorruptRecord)
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.json
19+
20+
import scala.collection.Map
21+
22+
import com.fasterxml.jackson.core._
23+
24+
import org.apache.spark.sql.catalyst.expressions._
25+
import org.apache.spark.sql.types._
26+
27+
private[sql] object JacksonGenerator {
28+
/** Transforms a single Row to JSON using Jackson
29+
*
30+
* @param rowSchema the schema object used for conversion
31+
* @param gen a JsonGenerator object
32+
* @param row The row to convert
33+
*/
34+
def apply(rowSchema: StructType, gen: JsonGenerator)(row: Row): Unit = {
35+
def valWriter: (DataType, Any) => Unit = {
36+
case (_, null) | (NullType, _) => gen.writeNull()
37+
case (StringType, v: String) => gen.writeString(v)
38+
case (TimestampType, v: java.sql.Timestamp) => gen.writeString(v.toString)
39+
case (IntegerType, v: Int) => gen.writeNumber(v)
40+
case (ShortType, v: Short) => gen.writeNumber(v)
41+
case (FloatType, v: Float) => gen.writeNumber(v)
42+
case (DoubleType, v: Double) => gen.writeNumber(v)
43+
case (LongType, v: Long) => gen.writeNumber(v)
44+
case (DecimalType(), v: java.math.BigDecimal) => gen.writeNumber(v)
45+
case (ByteType, v: Byte) => gen.writeNumber(v.toInt)
46+
case (BinaryType, v: Array[Byte]) => gen.writeBinary(v)
47+
case (BooleanType, v: Boolean) => gen.writeBoolean(v)
48+
case (DateType, v) => gen.writeString(v.toString)
49+
case (udt: UserDefinedType[_], v) => valWriter(udt.sqlType, udt.serialize(v))
50+
51+
case (ArrayType(ty, _), v: Seq[_] ) =>
52+
gen.writeStartArray()
53+
v.foreach(valWriter(ty,_))
54+
gen.writeEndArray()
55+
56+
case (MapType(kv,vv, _), v: Map[_,_]) =>
57+
gen.writeStartObject()
58+
v.foreach { p =>
59+
gen.writeFieldName(p._1.toString)
60+
valWriter(vv,p._2)
61+
}
62+
gen.writeEndObject()
63+
64+
case (StructType(ty), v: Row) =>
65+
gen.writeStartObject()
66+
ty.zip(v.toSeq).foreach {
67+
case (_, null) =>
68+
case (field, v) =>
69+
gen.writeFieldName(field.name)
70+
valWriter(field.dataType, v)
71+
}
72+
gen.writeEndObject()
73+
}
74+
75+
valWriter(rowSchema, row)
76+
}
77+
}

0 commit comments

Comments
 (0)