From c28777051b779eb0d029f45aae3f5c86c76600a5 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Tue, 22 Oct 2024 15:18:20 +0800 Subject: [PATCH 1/4] [SPARK-50066][SQL] Codegen Support for `SchemaOfXml` (by Invoke & RuntimeReplaceable) --- .../catalyst/expressions/xmlExpressions.scala | 50 +++++++------------ 1 file changed, 19 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala index f3f652b393f76..5bb96dd9a261d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala @@ -21,10 +21,12 @@ import java.io.CharArrayWriter import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess} -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode} -import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, FailureSafeParser, PermissiveMode} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.expressions.objects.Invoke +import org.apache.spark.sql.catalyst.expressions.xml.SchemaOfXmlEvaluator +import org.apache.spark.sql.catalyst.util.{FailFastMode, FailureSafeParser, PermissiveMode} import org.apache.spark.sql.catalyst.util.TypeUtils._ -import org.apache.spark.sql.catalyst.xml.{StaxXmlGenerator, StaxXmlParser, ValidatorUtil, XmlInferSchema, XmlOptions} +import org.apache.spark.sql.catalyst.xml.{StaxXmlGenerator, StaxXmlParser, ValidatorUtil, XmlOptions} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.types.StringTypeWithCollation @@ -149,7 +151,9 @@ case class XmlToStructs( case class SchemaOfXml( child: Expression, options: Map[String, String]) - extends UnaryExpression with CodegenFallback with QueryErrorsBase { + extends UnaryExpression + with RuntimeReplaceable + with QueryErrorsBase { def this(child: Expression) = this(child, Map.empty[String, String]) @@ -161,17 +165,6 @@ case class SchemaOfXml( override def nullable: Boolean = false - @transient - private lazy val xmlOptions = new XmlOptions(options, "UTC") - - @transient - private lazy val xmlInferSchema = { - if (xmlOptions.parseMode == DropMalformedMode) { - throw QueryCompilationErrors.parseModeUnsupportedError("schema_of_xml", xmlOptions.parseMode) - } - new XmlInferSchema(xmlOptions, caseSensitive = SQLConf.get.caseSensitiveAnalysis) - } - @transient private lazy val xml = child.eval().asInstanceOf[UTF8String] @@ -192,26 +185,21 @@ case class SchemaOfXml( } } - override def eval(v: InternalRow): Any = { - val dataType = xmlInferSchema.infer(xml.toString).get match { - case st: StructType => - xmlInferSchema.canonicalizeType(st).getOrElse(StructType(Nil)) - case at: ArrayType if at.elementType.isInstanceOf[StructType] => - xmlInferSchema - .canonicalizeType(at.elementType) - .map(ArrayType(_, containsNull = at.containsNull)) - .getOrElse(ArrayType(StructType(Nil), containsNull = at.containsNull)) - case other: DataType => - xmlInferSchema.canonicalizeType(other).getOrElse(SQLConf.get.defaultStringType) - } - - UTF8String.fromString(dataType.sql) - } - override def prettyName: String = "schema_of_xml" override protected def withNewChildInternal(newChild: Expression): SchemaOfXml = copy(child = newChild) + + @transient + private lazy val evaluator: SchemaOfXmlEvaluator = SchemaOfXmlEvaluator(options) + + override def replacement: Expression = Invoke( + Literal.create(evaluator, ObjectType(classOf[SchemaOfXmlEvaluator])), + "evaluate", + dataType, + Seq(child), + Seq(child.dataType) + ) } /** From a389d89c707f90396265b6e091e531aa00b39230 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Tue, 22 Oct 2024 15:30:56 +0800 Subject: [PATCH 2/4] add XmlExpressionEvalUtils --- .../xml/XmlExpressionEvalUtils.scala | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/XmlExpressionEvalUtils.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/XmlExpressionEvalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/XmlExpressionEvalUtils.scala new file mode 100644 index 0000000000000..0e93be480bee2 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/XmlExpressionEvalUtils.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions.xml + +import org.apache.spark.sql.catalyst.util.DropMalformedMode +import org.apache.spark.sql.catalyst.xml.{XmlInferSchema, XmlOptions} +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{ArrayType, DataType, StructType} +import org.apache.spark.unsafe.types.UTF8String + +case class SchemaOfXmlEvaluator(options: Map[String, String]) { + + @transient + private lazy val xmlOptions = new XmlOptions(options, "UTC") + + @transient + private lazy val xmlInferSchema = { + if (xmlOptions.parseMode == DropMalformedMode) { + throw QueryCompilationErrors.parseModeUnsupportedError("schema_of_xml", xmlOptions.parseMode) + } + new XmlInferSchema(xmlOptions, caseSensitive = SQLConf.get.caseSensitiveAnalysis) + } + + final def evaluate(xml: UTF8String): Any = { + val dataType = xmlInferSchema.infer(xml.toString).get match { + case st: StructType => + xmlInferSchema.canonicalizeType(st).getOrElse(StructType(Nil)) + case at: ArrayType if at.elementType.isInstanceOf[StructType] => + xmlInferSchema + .canonicalizeType(at.elementType) + .map(ArrayType(_, containsNull = at.containsNull)) + .getOrElse(ArrayType(StructType(Nil), containsNull = at.containsNull)) + case other: DataType => + xmlInferSchema.canonicalizeType(other).getOrElse(SQLConf.get.defaultStringType) + } + + UTF8String.fromString(dataType.sql) + } +} From efe2a11fb5596c0205964c2c3412857fefede1b9 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Wed, 13 Nov 2024 19:00:46 +0800 Subject: [PATCH 3/4] update --- .../xml/XmlExpressionEvalUtils.scala | 20 +++---------- .../catalyst/expressions/xmlExpressions.scala | 30 ++++++++++++------- 2 files changed, 24 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/XmlExpressionEvalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/XmlExpressionEvalUtils.scala index 0e93be480bee2..dff88475327a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/XmlExpressionEvalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/XmlExpressionEvalUtils.scala @@ -14,29 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.spark.sql.catalyst.expressions.xml -import org.apache.spark.sql.catalyst.util.DropMalformedMode -import org.apache.spark.sql.catalyst.xml.{XmlInferSchema, XmlOptions} -import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.catalyst.xml.XmlInferSchema import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ArrayType, DataType, StructType} import org.apache.spark.unsafe.types.UTF8String -case class SchemaOfXmlEvaluator(options: Map[String, String]) { - - @transient - private lazy val xmlOptions = new XmlOptions(options, "UTC") - - @transient - private lazy val xmlInferSchema = { - if (xmlOptions.parseMode == DropMalformedMode) { - throw QueryCompilationErrors.parseModeUnsupportedError("schema_of_xml", xmlOptions.parseMode) - } - new XmlInferSchema(xmlOptions, caseSensitive = SQLConf.get.caseSensitiveAnalysis) - } +object XmlExpressionEvalUtils { - final def evaluate(xml: UTF8String): Any = { + def schemaOfXml(xmlInferSchema: XmlInferSchema, xml: UTF8String): UTF8String = { val dataType = xmlInferSchema.infer(xml.toString).get match { case st: StructType => xmlInferSchema.canonicalizeType(st).getOrElse(StructType(Nil)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala index 1403e48f5025e..009ee1667b0e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala @@ -22,11 +22,11 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.expressions.objects.Invoke -import org.apache.spark.sql.catalyst.expressions.xml.SchemaOfXmlEvaluator -import org.apache.spark.sql.catalyst.util.{FailFastMode, FailureSafeParser, PermissiveMode} +import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke +import org.apache.spark.sql.catalyst.expressions.xml.XmlExpressionEvalUtils +import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, FailureSafeParser, PermissiveMode} import org.apache.spark.sql.catalyst.util.TypeUtils._ -import org.apache.spark.sql.catalyst.xml.{StaxXmlGenerator, StaxXmlParser, ValidatorUtil, XmlOptions} +import org.apache.spark.sql.catalyst.xml.{StaxXmlGenerator, StaxXmlParser, ValidatorUtil, XmlInferSchema, XmlOptions} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.types.StringTypeWithCollation @@ -191,14 +191,24 @@ case class SchemaOfXml( copy(child = newChild) @transient - private lazy val evaluator: SchemaOfXmlEvaluator = SchemaOfXmlEvaluator(options) + private lazy val xmlOptions = new XmlOptions(options, "UTC") - override def replacement: Expression = Invoke( - Literal.create(evaluator, ObjectType(classOf[SchemaOfXmlEvaluator])), - "evaluate", + @transient + private lazy val xmlInferSchema = { + if (xmlOptions.parseMode == DropMalformedMode) { + throw QueryCompilationErrors.parseModeUnsupportedError("schema_of_xml", xmlOptions.parseMode) + } + new XmlInferSchema(xmlOptions, caseSensitive = SQLConf.get.caseSensitiveAnalysis) + } + + @transient private lazy val xmlInferSchemaObjectType = ObjectType(classOf[XmlInferSchema]) + + override def replacement: Expression = StaticInvoke( + XmlExpressionEvalUtils.getClass, dataType, - Seq(child), - Seq(child.dataType) + "schemaOfXml", + Seq(Literal(xmlInferSchema, xmlInferSchemaObjectType), child), + Seq(xmlInferSchemaObjectType, child.dataType) ) } From 4ab91ecbd952f6014327bd190f43b68c4027fdce Mon Sep 17 00:00:00 2001 From: panbingkun Date: Wed, 13 Nov 2024 19:07:56 +0800 Subject: [PATCH 4/4] update --- .../catalyst/expressions/xmlExpressions.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala index 009ee1667b0e3..6f004cbce4262 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala @@ -165,6 +165,17 @@ case class SchemaOfXml( override def nullable: Boolean = false + @transient + private lazy val xmlOptions = new XmlOptions(options, "UTC") + + @transient + private lazy val xmlInferSchema = { + if (xmlOptions.parseMode == DropMalformedMode) { + throw QueryCompilationErrors.parseModeUnsupportedError("schema_of_xml", xmlOptions.parseMode) + } + new XmlInferSchema(xmlOptions, caseSensitive = SQLConf.get.caseSensitiveAnalysis) + } + @transient private lazy val xml = child.eval().asInstanceOf[UTF8String] @@ -190,17 +201,6 @@ case class SchemaOfXml( override protected def withNewChildInternal(newChild: Expression): SchemaOfXml = copy(child = newChild) - @transient - private lazy val xmlOptions = new XmlOptions(options, "UTC") - - @transient - private lazy val xmlInferSchema = { - if (xmlOptions.parseMode == DropMalformedMode) { - throw QueryCompilationErrors.parseModeUnsupportedError("schema_of_xml", xmlOptions.parseMode) - } - new XmlInferSchema(xmlOptions, caseSensitive = SQLConf.get.caseSensitiveAnalysis) - } - @transient private lazy val xmlInferSchemaObjectType = ObjectType(classOf[XmlInferSchema]) override def replacement: Expression = StaticInvoke(