diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index a6df7690c7e47..9165c9cc29928 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -743,8 +743,33 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg selectClause.hints.asScala.foldRight(withWindow)(withHints) } + // Script Transform's input/output format. + type ScriptIOFormat = + (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) + + protected def getRowFormatDelimited(ctx: RowFormatDelimitedContext): ScriptIOFormat = { + // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema + // expects a seq of pairs in which the old parsers' token names are used as keys. + // Transforming the result of visitRowFormatDelimited would be quite a bit messier than + // retrieving the key value pairs ourselves. + val entries = entry("TOK_TABLEROWFORMATFIELD", ctx.fieldsTerminatedBy) ++ + entry("TOK_TABLEROWFORMATCOLLITEMS", ctx.collectionItemsTerminatedBy) ++ + entry("TOK_TABLEROWFORMATMAPKEYS", ctx.keysTerminatedBy) ++ + entry("TOK_TABLEROWFORMATNULL", ctx.nullDefinedAs) ++ + Option(ctx.linesSeparatedBy).toSeq.map { token => + val value = string(token) + validate( + value == "\n", + s"LINES TERMINATED BY only supports newline '\\n' right now: $value", + ctx) + "TOK_TABLEROWFORMATLINES" -> value + } + + (entries, None, Seq.empty, None) + } + /** - * Create a (Hive based) [[ScriptInputOutputSchema]]. + * Create a [[ScriptInputOutputSchema]]. */ protected def withScriptIOSchema( ctx: ParserRuleContext, @@ -753,7 +778,30 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg outRowFormat: RowFormatContext, recordReader: Token, schemaLess: Boolean): ScriptInputOutputSchema = { - throw new ParseException("Script Transform is not supported", ctx) + + def format(fmt: RowFormatContext): ScriptIOFormat = fmt match { + case c: RowFormatDelimitedContext => + getRowFormatDelimited(c) + + case c: RowFormatSerdeContext => + throw new ParseException("TRANSFORM with serde is only supported in hive mode", ctx) + + // SPARK-32106: When there is no definition about format, we return empty result + // to use a built-in default Serde in SparkScriptTransformationExec. + case null => + (Nil, None, Seq.empty, None) + } + + val (inFormat, inSerdeClass, inSerdeProps, reader) = format(inRowFormat) + + val (outFormat, outSerdeClass, outSerdeProps, writer) = format(outRowFormat) + + ScriptInputOutputSchema( + inFormat, outFormat, + inSerdeClass, outSerdeClass, + inSerdeProps, outSerdeProps, + reader, writer, + schemaLess) } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 6fef18babedb6..54018198f619d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.types.{IntegerType, LongType, StringType} /** * Parser test cases for rules defined in [[CatalystSqlParser]] / [[AstBuilder]]. @@ -1031,4 +1031,115 @@ class PlanParserSuite extends AnalysisTest { assertEqual("select a, b from db.c;;;", table("db", "c").select('a, 'b)) assertEqual("select a, b from db.c; ;; ;", table("db", "c").select('a, 'b)) } + + test("SPARK-32106: TRANSFORM plan") { + // verify schema less + assertEqual( + """ + |SELECT TRANSFORM(a, b, c) + |USING 'cat' + |FROM testData + """.stripMargin, + ScriptTransformation( + Seq('a, 'b, 'c), + "cat", + Seq(AttributeReference("key", StringType)(), + AttributeReference("value", StringType)()), + UnresolvedRelation(TableIdentifier("testData")), + ScriptInputOutputSchema(List.empty, List.empty, None, None, + List.empty, List.empty, None, None, true)) + ) + + // verify without output schema + assertEqual( + """ + |SELECT TRANSFORM(a, b, c) + |USING 'cat' AS (a, b, c) + |FROM testData + """.stripMargin, + ScriptTransformation( + Seq('a, 'b, 'c), + "cat", + Seq(AttributeReference("a", StringType)(), + AttributeReference("b", StringType)(), + AttributeReference("c", StringType)()), + UnresolvedRelation(TableIdentifier("testData")), + ScriptInputOutputSchema(List.empty, List.empty, None, None, + List.empty, List.empty, None, None, false))) + + // verify with output schema + assertEqual( + """ + |SELECT TRANSFORM(a, b, c) + |USING 'cat' AS (a int, b string, c long) + |FROM testData + """.stripMargin, + ScriptTransformation( + Seq('a, 'b, 'c), + "cat", + Seq(AttributeReference("a", IntegerType)(), + AttributeReference("b", StringType)(), + AttributeReference("c", LongType)()), + UnresolvedRelation(TableIdentifier("testData")), + ScriptInputOutputSchema(List.empty, List.empty, None, None, + List.empty, List.empty, None, None, false))) + + // verify with ROW FORMAT DELIMETED + assertEqual( + """ + |SELECT TRANSFORM(a, b, c) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '\t' + | COLLECTION ITEMS TERMINATED BY '\u0002' + | MAP KEYS TERMINATED BY '\u0003' + | LINES TERMINATED BY '\n' + | NULL DEFINED AS 'null' + | USING 'cat' AS (a, b, c) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '\t' + | COLLECTION ITEMS TERMINATED BY '\u0004' + | MAP KEYS TERMINATED BY '\u0005' + | LINES TERMINATED BY '\n' + | NULL DEFINED AS 'NULL' + |FROM testData + """.stripMargin, + ScriptTransformation( + Seq('a, 'b, 'c), + "cat", + Seq(AttributeReference("a", StringType)(), + AttributeReference("b", StringType)(), + AttributeReference("c", StringType)()), + UnresolvedRelation(TableIdentifier("testData")), + ScriptInputOutputSchema( + Seq(("TOK_TABLEROWFORMATFIELD", "\t"), + ("TOK_TABLEROWFORMATCOLLITEMS", "\u0002"), + ("TOK_TABLEROWFORMATMAPKEYS", "\u0003"), + ("TOK_TABLEROWFORMATNULL", "null"), + ("TOK_TABLEROWFORMATLINES", "\n")), + Seq(("TOK_TABLEROWFORMATFIELD", "\t"), + ("TOK_TABLEROWFORMATCOLLITEMS", "\u0004"), + ("TOK_TABLEROWFORMATMAPKEYS", "\u0005"), + ("TOK_TABLEROWFORMATNULL", "NULL"), + ("TOK_TABLEROWFORMATLINES", "\n")), None, None, + List.empty, List.empty, None, None, false))) + + // verify with ROW FORMAT SERDE + intercept( + """ + |SELECT TRANSFORM(a, b, c) + | ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' + | WITH SERDEPROPERTIES( + | "separatorChar" = "\t", + | "quoteChar" = "'", + | "escapeChar" = "\\") + | USING 'cat' AS (a, b, c) + | ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' + | WITH SERDEPROPERTIES( + | "separatorChar" = "\t", + | "quoteChar" = "'", + | "escapeChar" = "\\") + |FROM testData + """.stripMargin, + "TRANSFORM with serde is only supported in hive mode") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index c88fcecc9983b..6994aaf47dfba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -43,6 +43,7 @@ class SparkPlanner(val session: SparkSession, val experimentalMethods: Experimen Window :: JoinSelection :: InMemoryScans :: + SparkScripts :: BasicOperators :: Nil) /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkScriptTransformationExec.scala new file mode 100644 index 0000000000000..75c91667012a3 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkScriptTransformationExec.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.io._ + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ +import org.apache.spark.util.CircularBuffer + +/** + * Transforms the input by forking and running the specified script. + * + * @param input the set of expression that should be passed to the script. + * @param script the command that should be executed. + * @param output the attributes that are produced by the script. + * @param child logical plan whose output is transformed. + * @param ioschema the class set that defines how to handle input/output data. + */ +case class SparkScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema) + extends BaseScriptTransformationExec { + + override def processIterator( + inputIterator: Iterator[InternalRow], + hadoopConf: Configuration): Iterator[InternalRow] = { + + val (outputStream, proc, inputStream, stderrBuffer) = initProc + + val outputProjection = new InterpretedProjection(inputExpressionsWithoutSerde, child.output) + + // This new thread will consume the ScriptTransformation's input rows and write them to the + // external process. That process's output will be read by this current thread. + val writerThread = SparkScriptTransformationWriterThread( + inputIterator.map(outputProjection), + inputExpressionsWithoutSerde.map(_.dataType), + ioschema, + outputStream, + proc, + stderrBuffer, + TaskContext.get(), + hadoopConf + ) + + val outputIterator = + createOutputIteratorWithoutSerde(writerThread, inputStream, proc, stderrBuffer) + + writerThread.start() + + outputIterator + } +} + +case class SparkScriptTransformationWriterThread( + iter: Iterator[InternalRow], + inputSchema: Seq[DataType], + ioSchema: ScriptTransformationIOSchema, + outputStream: OutputStream, + proc: Process, + stderrBuffer: CircularBuffer, + taskContext: TaskContext, + conf: Configuration) + extends BaseScriptTransformationWriterThread { + + override def processRows(): Unit = { + processRowsWithoutSerde() + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index ba5874c21f6c4..7b633884ac4d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION /** * Concrete parser for Spark SQL statements. @@ -486,70 +487,62 @@ class SparkSqlAstBuilder extends AstBuilder { "Unsupported operation: Used defined record reader/writer classes.", ctx) } - // Decode and input/output format. - type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) - def format( - fmt: RowFormatContext, - configKey: String, - defaultConfigValue: String): Format = fmt match { - case c: RowFormatDelimitedContext => - // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema - // expects a seq of pairs in which the old parsers' token names are used as keys. - // Transforming the result of visitRowFormatDelimited would be quite a bit messier than - // retrieving the key value pairs ourselves. - val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++ - entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++ - entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++ - entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs) ++ - Option(c.linesSeparatedBy).toSeq.map { token => - val value = string(token) - validate( - value == "\n", - s"LINES TERMINATED BY only supports newline '\\n' right now: $value", - c) - "TOK_TABLEROWFORMATLINES" -> value + if (!conf.getConf(CATALOG_IMPLEMENTATION).equals("hive")) { + super.withScriptIOSchema( + ctx, + inRowFormat, + recordWriter, + outRowFormat, + recordReader, + schemaLess) + } else { + def format( + fmt: RowFormatContext, + configKey: String, + defaultConfigValue: String): ScriptIOFormat = fmt match { + case c: RowFormatDelimitedContext => + getRowFormatDelimited(c) + + case c: RowFormatSerdeContext => + // Use a serde format. + val SerdeInfo(None, None, Some(name), props) = visitRowFormatSerde(c) + + // SPARK-10310: Special cases LazySimpleSerDe + val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { + Option(conf.getConfString(configKey, defaultConfigValue)) + } else { + None } + (Seq.empty, Option(name), props.toSeq, recordHandler) + + case null => + // Use default (serde) format. + val name = conf.getConfString("hive.script.serde", + "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") + val props = Seq( + "field.delim" -> "\t", + "serialization.last.column.takes.rest" -> "true") + val recordHandler = Option(conf.getConfString(configKey, defaultConfigValue)) + (Nil, Option(name), props, recordHandler) + } - (entries, None, Seq.empty, None) - - case c: RowFormatSerdeContext => - // Use a serde format. - val SerdeInfo(None, None, Some(name), props) = visitRowFormatSerde(c) - - // SPARK-10310: Special cases LazySimpleSerDe - val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { - Option(conf.getConfString(configKey, defaultConfigValue)) - } else { - None - } - (Seq.empty, Option(name), props.toSeq, recordHandler) - - case null => - // Use default (serde) format. - val name = conf.getConfString("hive.script.serde", - "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") - val props = Seq( - "field.delim" -> "\t", - "serialization.last.column.takes.rest" -> "true") - val recordHandler = Option(conf.getConfString(configKey, defaultConfigValue)) - (Nil, Option(name), props, recordHandler) + val (inFormat, inSerdeClass, inSerdeProps, reader) = + format( + inRowFormat, "hive.script.recordreader", + "org.apache.hadoop.hive.ql.exec.TextRecordReader") + + val (outFormat, outSerdeClass, outSerdeProps, writer) = + format( + outRowFormat, "hive.script.recordwriter", + "org.apache.hadoop.hive.ql.exec.TextRecordWriter") + + ScriptInputOutputSchema( + inFormat, outFormat, + inSerdeClass, outSerdeClass, + inSerdeProps, outSerdeProps, + reader, writer, + schemaLess) } - - val (inFormat, inSerdeClass, inSerdeProps, reader) = - format( - inRowFormat, "hive.script.recordreader", "org.apache.hadoop.hive.ql.exec.TextRecordReader") - - val (outFormat, outSerdeClass, outSerdeProps, writer) = - format( - outRowFormat, "hive.script.recordwriter", - "org.apache.hadoop.hive.ql.exec.TextRecordWriter") - - ScriptInputOutputSchema( - inFormat, outFormat, - inSerdeClass, outSerdeClass, - inSerdeProps, outSerdeProps, - reader, writer, - schemaLess) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index f5f77b03c2b1b..a8d788f59d271 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -594,6 +594,20 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } + object SparkScripts extends Strategy { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case logical.ScriptTransformation(input, script, output, child, ioschema) => + SparkScriptTransformationExec( + input, + script, + output, + planLater(child), + ScriptTransformationIOSchema(ioschema) + ) :: Nil + case _ => Nil + } + } + object BasicOperators extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil diff --git a/sql/core/src/test/resources/sql-tests/inputs/transform.sql b/sql/core/src/test/resources/sql-tests/inputs/transform.sql new file mode 100644 index 0000000000000..65b060eca3a62 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/transform.sql @@ -0,0 +1,195 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), 1, smallint(100), bigint(1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), 2, smallint(200), bigint(2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), 3, smallint(300), bigint(3), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +AS t(a, b, c, d, e, f, g, h, i, j, k, l); + +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t; + +-- with non-exist command +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t; + +-- with non-exist file +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t; + +-- common supported data types between no serde and serde transform +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + USING 'cat' AS ( + a string, + b boolean, + c binary, + d tinyint, + e int, + f smallint, + g long, + h float, + i double, + j decimal(38, 18), + k timestamp, + l date) + FROM t +) tmp; + +-- common supported data types between no serde and serde transform +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + USING 'cat' AS ( + a string, + b string, + c string, + d string, + e string, + f string, + g string, + h string, + i string, + j string, + k string, + l string) + FROM t +) tmp; + +-- SPARK-32388 handle schema less +SELECT TRANSFORM(a) +USING 'cat' +FROM t; + +SELECT TRANSFORM(a, b) +USING 'cat' +FROM t; + +SELECT TRANSFORM(a, b, c) +USING 'cat' +FROM t; + +-- return null when return string incompatible (no serde) +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i) +USING 'cat' AS (a int, b short, c long, d byte, e float, f double, g decimal(38, 18), h date, i timestamp) +FROM VALUES +('a','','1231a','a','213.21a','213.21a','0a.21d','2000-04-01123','1997-0102 00:00:') tmp(a, b, c, d, e, f, g, h, i); + +-- SPARK-28227: transform can't run with aggregation +SELECT TRANSFORM(b, max(a), sum(f)) +USING 'cat' AS (a, b) +FROM t +GROUP BY b; + +-- transform use MAP +MAP a, b USING 'cat' AS (a, b) FROM t; + +-- transform use REDUCE +REDUCE a, b USING 'cat' AS (a, b) FROM t; + +-- transform with defined row format delimit +SELECT TRANSFORM(a, b, c, null) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY '@' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' +USING 'cat' AS (a, b, c, d) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY '@' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' +FROM t; + +SELECT TRANSFORM(a, b, c, null) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY '@' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' +USING 'cat' AS (d) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY '@' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' +FROM t; + +-- transform with defined row format delimit handle schema with correct type +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' + USING 'cat' AS ( + a string, + b boolean, + c binary, + d tinyint, + e int, + f smallint, + g long, + h float, + i double, + j decimal(38, 18), + k timestamp, + l date) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' + FROM t +) tmp; + +-- transform with defined row format delimit handle schema with wrong type +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' + USING 'cat' AS ( + a string, + b long, + c binary, + d tinyint, + e int, + f smallint, + g long, + h float, + i double, + j decimal(38, 18), + k int, + l long) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' + FROM t +) tmp; + +-- transform with defined row format delimit LINE TERMINATED BY only support '\n' +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '@' + NULL DEFINED AS 'NULL' + USING 'cat' AS ( + a string, + b string, + c string, + d string, + e string, + f string, + g string, + h string, + i string, + j string, + k string, + l string) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '@' + NULL DEFINED AS 'NULL' + FROM t +) tmp; diff --git a/sql/core/src/test/resources/sql-tests/results/transform.sql.out b/sql/core/src/test/resources/sql-tests/results/transform.sql.out new file mode 100644 index 0000000000000..83ab5cb729c24 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/transform.sql.out @@ -0,0 +1,357 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 18 + + +-- !query +CREATE OR REPLACE TEMPORARY VIEW t AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), 1, smallint(100), bigint(1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), 2, smallint(200), bigint(2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), 3, smallint(300), bigint(3), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +AS t(a, b, c, d, e, f, g, h, i, j, k, l) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t +-- !query schema +struct +-- !query output +1 +2 +3 + + +-- !query +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkException +Subprocess exited with status 127. Error: /bin/bash: some_non_existent_command: command not found + + +-- !query +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkException +Subprocess exited with status 2. Error: python: can't open file 'some_non_existent_file': [Errno 2] No such file or directory + + +-- !query +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + USING 'cat' AS ( + a string, + b boolean, + c binary, + d tinyint, + e int, + f smallint, + g long, + h float, + i double, + j decimal(38, 18), + k timestamp, + l date) + FROM t +) tmp +-- !query schema +struct +-- !query output +1 true Spark SQL 1 1 100 1 1.0 1.0 1.000000000000000000 1997-01-02 00:00:00 2000-04-01 +2 false Spark SQL 2 2 200 2 2.0 2.0 2.000000000000000000 1997-01-02 03:04:05 2000-04-02 +3 true Spark SQL 3 3 300 3 3.0 3.0 3.000000000000000000 1997-02-10 17:32:01 2000-04-03 + + +-- !query +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + USING 'cat' AS ( + a string, + b string, + c string, + d string, + e string, + f string, + g string, + h string, + i string, + j string, + k string, + l string) + FROM t +) tmp +-- !query schema +struct +-- !query output +1 true Spark SQL 1 1 100 1 1.0 1.0 1 1997-01-02 00:00:00 2000-04-01 +2 false Spark SQL 2 2 200 2 2.0 2.0 2 1997-01-02 03:04:05 2000-04-02 +3 true Spark SQL 3 3 300 3 3.0 3.0 3 1997-02-10 17:32:01 2000-04-03 + + +-- !query +SELECT TRANSFORM(a) +USING 'cat' +FROM t +-- !query schema +struct +-- !query output +1 NULL +2 NULL +3 NULL + + +-- !query +SELECT TRANSFORM(a, b) +USING 'cat' +FROM t +-- !query schema +struct +-- !query output +1 true +2 false +3 true + + +-- !query +SELECT TRANSFORM(a, b, c) +USING 'cat' +FROM t +-- !query schema +struct +-- !query output +1 true +2 false +3 true + + +-- !query +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i) +USING 'cat' AS (a int, b short, c long, d byte, e float, f double, g decimal(38, 18), h date, i timestamp) +FROM VALUES +('a','','1231a','a','213.21a','213.21a','0a.21d','2000-04-01123','1997-0102 00:00:') tmp(a, b, c, d, e, f, g, h, i) +-- !query schema +struct +-- !query output +NULL NULL NULL NULL NULL NULL NULL NULL NULL + + +-- !query +SELECT TRANSFORM(b, max(a), sum(f)) +USING 'cat' AS (a, b) +FROM t +GROUP BY b +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException + +mismatched input 'GROUP' expecting {, ';'}(line 4, pos 0) + +== SQL == +SELECT TRANSFORM(b, max(a), sum(f)) +USING 'cat' AS (a, b) +FROM t +GROUP BY b +^^^ + + +-- !query +MAP a, b USING 'cat' AS (a, b) FROM t +-- !query schema +struct +-- !query output +1 true +2 false +3 true + + +-- !query +REDUCE a, b USING 'cat' AS (a, b) FROM t +-- !query schema +struct +-- !query output +1 true +2 false +3 true + + +-- !query +SELECT TRANSFORM(a, b, c, null) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY '@' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' +USING 'cat' AS (a, b, c, d) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY '@' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' +FROM t +-- !query schema +struct +-- !query output +1 true Spark SQL null +2 false Spark SQL null +3 true Spark SQL null + + +-- !query +SELECT TRANSFORM(a, b, c, null) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY '@' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' +USING 'cat' AS (d) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY '@' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' +FROM t +-- !query schema +struct +-- !query output +1 +2 +3 + + +-- !query +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' + USING 'cat' AS ( + a string, + b boolean, + c binary, + d tinyint, + e int, + f smallint, + g long, + h float, + i double, + j decimal(38, 18), + k timestamp, + l date) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' + FROM t +) tmp +-- !query schema +struct +-- !query output +1 true Spark SQL 1 1 100 1 1.0 1.0 1.000000000000000000 1997-01-02 00:00:00 2000-04-01 +2 false Spark SQL 2 2 200 2 2.0 2.0 2.000000000000000000 1997-01-02 03:04:05 2000-04-02 +3 true Spark SQL 3 3 300 3 3.0 3.0 3.000000000000000000 1997-02-10 17:32:01 2000-04-03 + + +-- !query +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' + USING 'cat' AS ( + a string, + b long, + c binary, + d tinyint, + e int, + f smallint, + g long, + h float, + i double, + j decimal(38, 18), + k int, + l long) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' + FROM t +) tmp +-- !query schema +struct +-- !query output +1 NULL Spark SQL 1 1 100 1 1.0 1.0 1.000000000000000000 NULL NULL +2 NULL Spark SQL 2 2 200 2 2.0 2.0 2.000000000000000000 NULL NULL +3 NULL Spark SQL 3 3 300 3 3.0 3.0 3.000000000000000000 NULL NULL + + +-- !query +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '@' + NULL DEFINED AS 'NULL' + USING 'cat' AS ( + a string, + b string, + c string, + d string, + e string, + f string, + g string, + h string, + i string, + j string, + k string, + l string) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '@' + NULL DEFINED AS 'NULL' + FROM t +) tmp +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException + +LINES TERMINATED BY only supports newline '\n' right now: @(line 3, pos 4) + +== SQL == +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + ROW FORMAT DELIMITED +----^^^ + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '@' + NULL DEFINED AS 'NULL' + USING 'cat' AS ( + a string, + b string, + c string, + d string, + e string, + f string, + g string, + h string, + i string, + j string, + k string, + l string) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '@' + NULL DEFINED AS 'NULL' + FROM t +) tmp diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 02c6fba9725d3..eb2caa61e1590 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -24,7 +24,7 @@ import java.util.Locale import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.{SparkConf, SparkException, TestUtils} import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.SQLHelper @@ -260,6 +260,9 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper newLine.startsWith("--") && !newLine.startsWith("--QUERY-DELIMITER") } + // SPARK-32106 Since we add SQL test 'transform.sql' will use `cat` command, + // here we need to check command available + assume(TestUtils.testCommandAvailable("/bin/bash")) val input = fileToString(new File(testCase.inputFile)) val (comments, code) = splitCommentsAndCodes(input) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala new file mode 100644 index 0000000000000..6ff7c5d6d2f3a --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.{SparkException, TestUtils} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.test.SharedSparkSession + +class SparkScriptTransformationSuite extends BaseScriptTransformationSuite with SharedSparkSession { + import testImplicits._ + + override def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec = { + SparkScriptTransformationExec( + input = input, + script = script, + output = output, + child = child, + ioschema = ioschema + ) + } + + test("SPARK-32106: TRANSFORM with serde without hive should throw exception") { + assume(TestUtils.testCommandAvailable("/bin/bash")) + withTempView("v") { + val df = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") + df.createTempView("v") + + val e = intercept[ParseException] { + sql( + """ + |SELECT TRANSFORM (a) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' + |USING 'cat' AS (a) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' + |FROM v + """.stripMargin) + }.getMessage + assert(e.contains("TRANSFORM with serde is only supported in hive mode")) + } + } + + test("SPARK-32106: TRANSFORM doesn't support ArrayType/MapType/StructType " + + "as output data type (no serde)") { + assume(TestUtils.testCommandAvailable("/bin/bash")) + // check for ArrayType + val e1 = intercept[SparkException] { + sql( + """ + |SELECT TRANSFORM(a) + |USING 'cat' AS (a array) + |FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c) + """.stripMargin).collect() + }.getMessage + assert(e1.contains("SparkScriptTransformation without serde does not support" + + " ArrayType as output data type")) + + // check for MapType + val e2 = intercept[SparkException] { + sql( + """ + |SELECT TRANSFORM(b) + |USING 'cat' AS (b map) + |FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c) + """.stripMargin).collect() + }.getMessage + assert(e2.contains("SparkScriptTransformation without serde does not support" + + " MapType as output data type")) + + // check for StructType + val e3 = intercept[SparkException] { + sql( + """ + |SELECT TRANSFORM(c) + |USING 'cat' AS (c struct) + |FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c) + """.stripMargin).collect() + }.getMessage + assert(e3.contains("SparkScriptTransformation without serde does not support" + + " StructType as output data type")) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala index 26baff3d83eec..4b03cff5e8c8e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala @@ -45,6 +45,8 @@ import org.apache.spark.util.{CircularBuffer, Utils} * @param input the set of expression that should be passed to the script. * @param script the command that should be executed. * @param output the attributes that are produced by the script. + * @param child logical plan whose output is transformed. + * @param ioschema the class set that defines how to handle input/output data. */ case class HiveScriptTransformationExec( input: Seq[Expression],