1717
1818package org .apache .spark .sql .execution
1919
20- import org .apache .spark .rdd .RDD
21- import org .apache .spark .sql .catalyst .InternalRow
22- import org .apache .spark .sql .catalyst .expressions .Attribute
23- import org .apache .spark .unsafe .types .UTF8String
24-
2520import scala .collection .mutable .HashSet
2621
27- import org .apache .spark .{AccumulatorParam , Accumulator , Logging }
28- import org .apache .spark .annotation .DeveloperApi
22+ import org .apache .spark .rdd .RDD
2923import org .apache .spark .sql ._
24+ import org .apache .spark .sql .catalyst .InternalRow
25+ import org .apache .spark .sql .catalyst .expressions .Attribute
3026import org .apache .spark .sql .catalyst .trees .TreeNodeRef
31- import org .apache .spark .sql . types . _
27+ import org .apache .spark .{ Accumulator , AccumulatorParam , Logging }
3228
3329/**
34- * :: DeveloperApi ::
3530 * Contains methods for debugging query execution.
3631 *
3732 * Usage:
@@ -53,10 +48,8 @@ package object debug {
5348 }
5449
5550 /**
56- * :: DeveloperApi ::
5751 * Augments [[DataFrame ]]s with debug methods.
5852 */
59- @ DeveloperApi
6053 implicit class DebugQuery (query : DataFrame ) extends Logging {
6154 def debug (): Unit = {
6255 val plan = query.queryExecution.executedPlan
@@ -72,23 +65,6 @@ package object debug {
7265 case _ =>
7366 }
7467 }
75-
76- def typeCheck (): Unit = {
77- val plan = query.queryExecution.executedPlan
78- val visited = new collection.mutable.HashSet [TreeNodeRef ]()
79- val debugPlan = plan transform {
80- case s : SparkPlan if ! visited.contains(new TreeNodeRef (s)) =>
81- visited += new TreeNodeRef (s)
82- TypeCheck (s)
83- }
84- try {
85- logDebug(s " Results returned: ${debugPlan.execute().count()}" )
86- } catch {
87- case e : Exception =>
88- def unwrap (e : Throwable ): Throwable = if (e.getCause == null ) e else unwrap(e.getCause)
89- logDebug(s " Deepest Error: ${unwrap(e)}" )
90- }
91- }
9268 }
9369
9470 private [sql] case class DebugNode (child : SparkPlan ) extends UnaryNode {
@@ -148,76 +124,4 @@ package object debug {
148124 }
149125 }
150126 }
151-
152- /**
153- * Helper functions for checking that runtime types match a given schema.
154- */
155- private [sql] object TypeCheck {
156- def typeCheck (data : Any , schema : DataType ): Unit = (data, schema) match {
157- case (null , _) =>
158-
159- case (row : InternalRow , s : StructType ) =>
160- row.toSeq(s).zip(s.map(_.dataType)).foreach { case (d, t) => typeCheck(d, t) }
161- case (a : ArrayData , ArrayType (elemType, _)) =>
162- a.foreach(elemType, (_, e) => {
163- typeCheck(e, elemType)
164- })
165- case (m : MapData , MapType (keyType, valueType, _)) =>
166- m.keyArray().foreach(keyType, (_, e) => {
167- typeCheck(e, keyType)
168- })
169- m.valueArray().foreach(valueType, (_, e) => {
170- typeCheck(e, valueType)
171- })
172-
173- case (_ : Long , LongType ) =>
174- case (_ : Int , IntegerType ) =>
175- case (_ : UTF8String , StringType ) =>
176- case (_ : Float , FloatType ) =>
177- case (_ : Byte , ByteType ) =>
178- case (_ : Short , ShortType ) =>
179- case (_ : Boolean , BooleanType ) =>
180- case (_ : Double , DoubleType ) =>
181- case (_ : Int , DateType ) =>
182- case (_ : Long , TimestampType ) =>
183- case (v, udt : UserDefinedType [_]) => typeCheck(v, udt.sqlType)
184-
185- case (d, t) => sys.error(s " Invalid data found: got $d ( ${d.getClass}) expected $t" )
186- }
187- }
188-
189- /**
190- * Augments [[DataFrame ]]s with debug methods.
191- */
192- private [sql] case class TypeCheck (child : SparkPlan ) extends SparkPlan {
193- import TypeCheck ._
194-
195- override def nodeName : String = " "
196-
197- /* Only required when defining this class in a REPL.
198- override def makeCopy(args: Array[Object]): this.type =
199- TypeCheck(args(0).asInstanceOf[SparkPlan]).asInstanceOf[this.type]
200- */
201-
202- def output : Seq [Attribute ] = child.output
203-
204- def children : List [SparkPlan ] = child :: Nil
205-
206- protected override def doExecute (): RDD [InternalRow ] = {
207- child.execute().map { row =>
208- try typeCheck(row, child.schema) catch {
209- case e : Exception =>
210- sys.error(
211- s """
212- |ERROR WHEN TYPE CHECKING QUERY
213- |==============================
214- | $e
215- |======== BAD TREE ============
216- | $child
217- """ .stripMargin)
218- }
219- row
220- }
221- }
222- }
223127}
0 commit comments