Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
44151c7
[SPARK-40663][SQL] Migrate execution errors onto error classes: _LEGA…
itholic Oct 9, 2022
f77c8f3
[SPARK-40712][BUILD] Upgrade `sbt-assembly` plugin to 1.2.0
LuciferYang Oct 9, 2022
51e8ca3
[SPARK-40699][DOCS] Supplement undocumented yarn configurations in do…
dcoliversun Oct 9, 2022
4b8f0e5
[SPARK-40709][DOCS] Supplement undocumented avro configurations in do…
dcoliversun Oct 9, 2022
f39b75c
[SPARK-40710][DOCS] Supplement undocumented parquet configurations in…
dcoliversun Oct 9, 2022
cd7ca92
[SPARK-40675][DOCS] Supplement undocumented spark configurations in `…
dcoliversun Oct 9, 2022
357193d
[SPARK-40663][SQL] Migrate execution errors onto error classes: _LEGA…
itholic Oct 9, 2022
cf05383
[SPARK-40718][CONNECT] Replace `grpc-netty-shaded` with`grpc-netty`
grundprinzip Oct 10, 2022
d323fa8
[SPARK-40448][CONNECT][FOLLOWUP] Use more suitable variable name and …
beliefer Oct 10, 2022
76a7c5a
[SPARK-40724][PS] Simplify `corr` with method `inline`
zhengruifeng Oct 10, 2022
3fa958a
[SPARK-40725][INFRA] Add `mypy-protobuf` to dev/requirements
zhengruifeng Oct 10, 2022
81d8aa5
[SPARK-40665][CONNECT][FOLLOW-UP] Fix `connector/connect/dev/generate…
zhengruifeng Oct 10, 2022
52a66c9
[SPARK-40718][BUILD][CONNECT][FOLLOWUP] Explicitly add Netty related …
LuciferYang Oct 10, 2022
9e8198d
[SPARK-40726][DOCS] Supplement undocumented orc configurations in doc…
dcoliversun Oct 10, 2022
9a97f8c
[SPARK-40705][SQL] Handle case of using mutable array when converting…
Amraneze Oct 10, 2022
288bdd2
[SPARK-40714][SQL] Remove `PartitionAlreadyExistsException`
MaxGekk Oct 10, 2022
67c6408
[SPARK-40534][CONNECT] Extend the support for Join with different joi…
amaliujia Oct 11, 2022
f8d68b0
[SPARK-40725][INFRA][FOLLOWUP] Mark mypy-protobuf as an optional depe…
amaliujia Oct 11, 2022
4eb0edf
[SPARK-40596][CORE] Populate ExecutorDecommission with messages in Ex…
bozhang2820 Oct 11, 2022
e927a7e
[SPARK-40677][CONNECT][FOLLOWUP] Refactor shade `relocation/rename` r…
LuciferYang Oct 11, 2022
d59f71c
[SPARK-40698][PS][SQL] Improve the precision of `product` for integra…
zhengruifeng Oct 11, 2022
4e4a848
[SPARK-40707][CONNECT] Add groupby to connect DSL and test more than …
amaliujia Oct 11, 2022
47d119d
[SPARK-40358][SQL] Migrate collection type check failures onto error …
lvshaokang Oct 11, 2022
9ddd734
[SPARK-40740][SQL] Improve listFunctions in SessionCatalog
allisonwang-db Oct 11, 2022
8e85393
[SPARK-40667][SQL] Refactor File Data Source Options
xiaonanyang-db Oct 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -3961,7 +3961,8 @@ test_that("Call DataFrameWriter.save() API in Java without path and check argume
# It makes sure that we can omit path argument in write.df API and then it calls
# DataFrameWriter.save() without path.
expect_error(write.df(df, source = "csv"),
"Error in save : illegal argument - Expected exactly one path to be specified")
paste("Error in save : org.apache.spark.SparkIllegalArgumentException:",
"Expected exactly one path to be specified"))
expect_error(write.json(df, jsonPath),
"Error in json : analysis error - Path file:.*already exists")
expect_error(write.text(df, jsonPath),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.FileSourceOptions
import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, FailFastMode, ParseMode}
import org.apache.spark.sql.internal.SQLConf

Expand All @@ -37,6 +37,8 @@ private[sql] class AvroOptions(
@transient val conf: Configuration)
extends FileSourceOptions(parameters) with Logging {

import AvroOptions._

def this(parameters: Map[String, String], conf: Configuration) = {
this(CaseInsensitiveMap(parameters), conf)
}
Expand All @@ -54,8 +56,8 @@ private[sql] class AvroOptions(
* instead of "string" type in the default converted schema.
*/
val schema: Option[Schema] = {
parameters.get("avroSchema").map(new Schema.Parser().setValidateDefaults(false).parse).orElse({
val avroUrlSchema = parameters.get("avroSchemaUrl").map(url => {
parameters.get(AVRO_SCHEMA).map(new Schema.Parser().setValidateDefaults(false).parse).orElse({
val avroUrlSchema = parameters.get(AVRO_SCHEMA_URL).map(url => {
log.debug("loading avro schema from url: " + url)
val fs = FileSystem.get(new URI(url), conf)
val in = fs.open(new Path(url))
Expand All @@ -75,20 +77,20 @@ private[sql] class AvroOptions(
* whose field names do not match. Defaults to false.
*/
val positionalFieldMatching: Boolean =
parameters.get("positionalFieldMatching").exists(_.toBoolean)
parameters.get(POSITIONAL_FIELD_MATCHING).exists(_.toBoolean)

/**
* Top level record name in write result, which is required in Avro spec.
* See https://avro.apache.org/docs/1.11.1/specification/#schema-record .
* Default value is "topLevelRecord"
*/
val recordName: String = parameters.getOrElse("recordName", "topLevelRecord")
val recordName: String = parameters.getOrElse(RECORD_NAME, "topLevelRecord")

/**
* Record namespace in write result. Default value is "".
* See Avro spec for details: https://avro.apache.org/docs/1.11.1/specification/#schema-record .
*/
val recordNamespace: String = parameters.getOrElse("recordNamespace", "")
val recordNamespace: String = parameters.getOrElse(RECORD_NAMESPACE, "")

/**
* The `ignoreExtension` option controls ignoring of files without `.avro` extensions in read.
Expand All @@ -104,7 +106,7 @@ private[sql] class AvroOptions(
ignoreFilesWithoutExtensionByDefault)

parameters
.get(AvroOptions.ignoreExtensionKey)
.get(IGNORE_EXTENSION)
.map(_.toBoolean)
.getOrElse(!ignoreFilesWithoutExtension)
}
Expand All @@ -116,21 +118,21 @@ private[sql] class AvroOptions(
* taken into account. If the former one is not set too, the `snappy` codec is used by default.
*/
val compression: String = {
parameters.get("compression").getOrElse(SQLConf.get.avroCompressionCodec)
parameters.get(COMPRESSION).getOrElse(SQLConf.get.avroCompressionCodec)
}

val parseMode: ParseMode =
parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
parameters.get(MODE).map(ParseMode.fromString).getOrElse(FailFastMode)

/**
* The rebasing mode for the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS values in reads.
*/
val datetimeRebaseModeInRead: String = parameters
.get(AvroOptions.DATETIME_REBASE_MODE)
.get(DATETIME_REBASE_MODE)
.getOrElse(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_READ))
}

private[sql] object AvroOptions {
private[sql] object AvroOptions extends DataSourceOptions {
def apply(parameters: Map[String, String]): AvroOptions = {
val hadoopConf = SparkSession
.getActiveSession
Expand All @@ -139,11 +141,17 @@ private[sql] object AvroOptions {
new AvroOptions(CaseInsensitiveMap(parameters), hadoopConf)
}

val ignoreExtensionKey = "ignoreExtension"

val IGNORE_EXTENSION = newOption("ignoreExtension")
val MODE = newOption("mode")
val RECORD_NAME = newOption("recordName")
val COMPRESSION = newOption("compression")
val AVRO_SCHEMA = newOption("avroSchema")
val AVRO_SCHEMA_URL = newOption("avroSchemaUrl")
val RECORD_NAMESPACE = newOption("recordNamespace")
val POSITIONAL_FIELD_MATCHING = newOption("positionalFieldMatching")
// The option controls rebasing of the DATE and TIMESTAMP values between
// Julian and Proleptic Gregorian calendars. It impacts on the behaviour of the Avro
// datasource similarly to the SQL config `spark.sql.avro.datetimeRebaseModeInRead`,
// and can be set to the same values: `EXCEPTION`, `LEGACY` or `CORRECTED`.
val DATETIME_REBASE_MODE = "datetimeRebaseMode"
val DATETIME_REBASE_MODE = newOption("datetimeRebaseMode")
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.AvroOptions.ignoreExtensionKey
import org.apache.spark.sql.avro.AvroOptions.IGNORE_EXTENSION
import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.OutputWriterFactory
Expand All @@ -50,8 +50,8 @@ private[sql] object AvroUtils extends Logging {
val conf = spark.sessionState.newHadoopConfWithOptions(options)
val parsedOptions = new AvroOptions(options, conf)

if (parsedOptions.parameters.contains(ignoreExtensionKey)) {
logWarning(s"Option $ignoreExtensionKey is deprecated. Please use the " +
if (parsedOptions.parameters.contains(IGNORE_EXTENSION)) {
logWarning(s"Option $IGNORE_EXTENSION is deprecated. Please use the " +
"general data source option pathGlobFilter for filtering file names.")
}
// User can specify an optional avro json schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1804,13 +1804,13 @@ abstract class AvroSuite
spark
.read
.format("avro")
.option(AvroOptions.ignoreExtensionKey, false)
.option(AvroOptions.IGNORE_EXTENSION, false)
.load(dir.getCanonicalPath)
.count()
}
val deprecatedEvents = logAppender.loggingEvents
.filter(_.getMessage.getFormattedMessage.contains(
s"Option ${AvroOptions.ignoreExtensionKey} is deprecated"))
s"Option ${AvroOptions.IGNORE_EXTENSION} is deprecated"))
assert(deprecatedEvents.size === 1)
}
}
Expand Down Expand Up @@ -2272,6 +2272,20 @@ abstract class AvroSuite
checkAnswer(df2, df.collect().toSeq)
}
}

test("SPARK-40667: validate Avro Options") {
assert(AvroOptions.getAllOptions.size == 9)
// Please add validation on any new Avro options here
assert(AvroOptions.isValidOption("ignoreExtension"))
assert(AvroOptions.isValidOption("mode"))
assert(AvroOptions.isValidOption("recordName"))
assert(AvroOptions.isValidOption("compression"))
assert(AvroOptions.isValidOption("avroSchema"))
assert(AvroOptions.isValidOption("avroSchemaUrl"))
assert(AvroOptions.isValidOption("recordNamespace"))
assert(AvroOptions.isValidOption("positionalFieldMatching"))
assert(AvroOptions.isValidOption("datetimeRebaseMode"))
}
}

class AvroV1Suite extends AvroSuite {
Expand Down
2 changes: 1 addition & 1 deletion connector/connect/dev/generate_protos.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#
set -ex

SPARK_HOME="$(cd "`dirname $0`"/../..; pwd)"
SPARK_HOME="$(cd "`dirname $0`"/../../..; pwd)"
cd "$SPARK_HOME"

pushd connector/connect/src/main
Expand Down
86 changes: 72 additions & 14 deletions connector/connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<artifactId>grpc-netty</artifactId>
<version>${io.grpc.version}</version>
</dependency>
<dependency>
Expand All @@ -152,6 +152,24 @@
<artifactId>grpc-stub</artifactId>
<version>${io.grpc.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
<version>${netty.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler-proxy</artifactId>
<version>${netty.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-unix-common</artifactId>
<version>${netty.version}</version>
<scope>provided</scope>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
Expand Down Expand Up @@ -250,11 +268,13 @@
as assembly build.
-->
<include>com.google.android:annotations</include>
<include>com.google.api.grpc:proto-google-common-proto</include>
<include>com.google.api.grpc:proto-google-common-protos</include>
<include>io.perfmark:perfmark-api</include>
<include>org.codehaus.mojo:animal-sniffer-annotations</include>
<include>com.google.errorprone:error_prone_annotations</include>
<include>com.google.j2objc:j2objc-annotations</include>
<include>org.checkerframework:checker-qual</include>
<include>com.google.code.gson:gson</include>
</includes>
</artifactSet>
<relocations>
Expand Down Expand Up @@ -285,28 +305,66 @@
</relocation>

<relocation>
<pattern>com.google.android</pattern>
<shadedPattern>${spark.shade.packageName}.connect.android</shadedPattern>
<pattern>android.annotation</pattern>
<shadedPattern>${spark.shade.packageName}.connect.android_annotation</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.api.grpc</pattern>
<shadedPattern>${spark.shade.packageName}.connect.api</shadedPattern>
<pattern>io.perfmark</pattern>
<shadedPattern>${spark.shade.packageName}.connect.io_perfmark</shadedPattern>
</relocation>
<relocation>
<pattern>io.perfmark</pattern>
<shadedPattern>${spark.shade.packageName}.connect.perfmark</shadedPattern>
<pattern>org.codehaus.mojo.animal_sniffer</pattern>
<shadedPattern>${spark.shade.packageName}.connect.animal_sniffer</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.j2objc.annotations</pattern>
<shadedPattern>${spark.shade.packageName}.connect.j2objc_annotations</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.errorprone.annotations</pattern>
<shadedPattern>${spark.shade.packageName}.connect.errorprone_annotations</shadedPattern>
</relocation>
<relocation>
<pattern>org.checkerframework</pattern>
<shadedPattern>${spark.shade.packageName}.connect.checkerframework</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.gson</pattern>
<shadedPattern>${spark.shade.packageName}.connect.gson</shadedPattern>
</relocation>

<!--
For `com.google.api.grpc:proto-google-common-protos`, do not directly define pattern
as `common.google`, otherwise, otherwise, the relocation result may be uncertain due
to the change of rule order.
-->
<relocation>
<pattern>com.google.api</pattern>
<shadedPattern>${spark.shade.packageName}.connect.google_protos.api</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.cloud</pattern>
<shadedPattern>${spark.shade.packageName}.connect.google_protos.cloud</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.geo</pattern>
<shadedPattern>${spark.shade.packageName}.connect.google_protos.geo</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.logging</pattern>
<shadedPattern>${spark.shade.packageName}.connect.google_protos.logging</shadedPattern>
</relocation>
<relocation>
<pattern>org.codehaus.mojo</pattern>
<shadedPattern>${spark.shade.packageName}.connect.mojo</shadedPattern>
<pattern>com.google.longrunning</pattern>
<shadedPattern>${spark.shade.packageName}.connect.google_protos.longrunning</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.errorprone</pattern>
<shadedPattern>${spark.shade.packageName}.connect.errorprone</shadedPattern>
<pattern>com.google.rpc</pattern>
<shadedPattern>${spark.shade.packageName}.connect.google_protos.rpc</shadedPattern>
</relocation>
<relocation>
<pattern>com.com.google.j2objc</pattern>
<shadedPattern>${spark.shade.packageName}.connect.j2objc</shadedPattern>
<pattern>com.google.type</pattern>
<shadedPattern>${spark.shade.packageName}.connect.google_protos.type</shadedPattern>
</relocation>
</relocations>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ message CreateScalarFunction {
repeated string parts = 1;
FunctionLanguage language = 2;
bool temporary = 3;
repeated Type argument_types = 4;
Type return_type = 5;
repeated DataType argument_types = 4;
DataType return_type = 5;

// How the function body is defined:
oneof function_definition {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ message Expression {
// Timestamp in units of microseconds since the UNIX epoch.
int64 timestamp_tz = 27;
bytes uuid = 28;
Type null = 29; // a typed null literal
DataType null = 29; // a typed null literal
List list = 30;
Type.List empty_list = 31;
Type.Map empty_map = 32;
DataType.List empty_list = 31;
DataType.Map empty_map = 32;
UserDefined user_defined = 33;
}

Expand Down Expand Up @@ -164,5 +164,6 @@ message Expression {
// by the analyzer.
message QualifiedAttribute {
string name = 1;
DataType type = 2;
}
}
27 changes: 7 additions & 20 deletions connector/connect/src/main/protobuf/spark/connect/relations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,17 @@ message Filter {
message Join {
Relation left = 1;
Relation right = 2;
Expression on = 3;
JoinType how = 4;
Expression join_condition = 3;
JoinType join_type = 4;

enum JoinType {
JOIN_TYPE_UNSPECIFIED = 0;
JOIN_TYPE_INNER = 1;
JOIN_TYPE_OUTER = 2;
JOIN_TYPE_FULL_OUTER = 2;
JOIN_TYPE_LEFT_OUTER = 3;
JOIN_TYPE_RIGHT_OUTER = 4;
JOIN_TYPE_ANTI = 5;
JOIN_TYPE_LEFT_ANTI = 5;
JOIN_TYPE_LEFT_SEMI = 6;
}
}

Expand All @@ -129,22 +130,8 @@ message Fetch {
// Relation of type [[Aggregate]].
message Aggregate {
Relation input = 1;

// Grouping sets are used in rollups
repeated GroupingSet grouping_sets = 2;

// Measures
repeated Measure measures = 3;

message GroupingSet {
repeated Expression aggregate_expressions = 1;
}

message Measure {
AggregateFunction function = 1;
// Conditional filter for SUM(x FILTER WHERE x < 10)
Expression filter = 2;
}
repeated Expression grouping_expressions = 2;
repeated AggregateFunction result_expressions = 3;

message AggregateFunction {
string name = 1;
Expand Down
Loading