Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
import org.apache.commons.io.FileUtils

import org.apache.spark.sql._
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._

Expand All @@ -51,6 +52,13 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
checkAnswer(newEntries, originalEntries)
}

test("resolve avro data source") {
Seq("avro", "com.databricks.spark.avro").foreach { provider =>
assert(DataSource.lookupDataSource(provider, spark.sessionState.conf) ===
classOf[org.apache.spark.sql.avro.AvroFileFormat])
}
}

test("reading from multiple paths") {
val df = spark.read.format("avro").load(episodesAvro, episodesAvro)
assert(df.count == 16)
Expand Down Expand Up @@ -456,7 +464,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
// get the same values back.
withTempPath { tempDir =>
val name = "AvroTest"
val namespace = "com.databricks.spark.avro"
val namespace = "org.apache.spark.avro"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

val avroDir = tempDir + "/namedAvro"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ object DataSource extends Logging {
val nativeOrc = classOf[OrcFileFormat].getCanonicalName
val socket = classOf[TextSocketSourceProvider].getCanonicalName
val rate = classOf[RateStreamProvider].getCanonicalName
val avro = "org.apache.spark.sql.avro.AvroFileFormat"

Map(
"org.apache.spark.sql.jdbc" -> jdbc,
Expand All @@ -592,6 +593,7 @@ object DataSource extends Logging {
"org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
"org.apache.spark.ml.source.libsvm" -> libsvm,
"com.databricks.spark.csv" -> csv,
"com.databricks.spark.avro" -> avro,
"org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket,
"org.apache.spark.sql.execution.streaming.RateSourceProvider" -> rate
)
Expand Down Expand Up @@ -635,12 +637,6 @@ object DataSource extends Logging {
"Hive built-in ORC data source must be used with Hive support enabled. " +
"Please use the native ORC data source by setting 'spark.sql.orc.impl' to " +
"'native'")
} else if (provider1.toLowerCase(Locale.ROOT) == "avro" ||
provider1 == "com.databricks.spark.avro") {
throw new AnalysisException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we show message to user for loading the built-in spark-avro jar?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think it would be okay. If user provide avro, it will show an error like:

17/05/10 09:47:44 WARN DataSource: Multiple sources found for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).

in most cases (see #17916)

Copy link
Member

@gengliangwang gengliangwang Jul 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I mean by default the avro package is not loaded. E.g. If we start spark-shell without loading the jar, then it will show error "Failed to find data source: avro. Please find an Avro package at http://spark.apache.org/third-party-projects.html" if we use format("avro").

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh, if users were using the external avro, they will likely meet the error if they directly upgrade Spark.
Otherwise, users will see the release note that Avro pacakge is included in 2.4.0, and they will not provide this jar.
If users miss this release note, then they will try to explicitly provide the thirdparty jar which will give the error message above.

FWIW, if it's fully qualified path, the thridparty jar will still be used in theory.

Did I misunderstand or miss something maybe?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is the Avro jar meant to be separately distributed? I thought it'd be included within Spark.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally agree with the mapping, we should do it.
The comment here is about when Spark can't find any avro package, we should show a message for loading the spark-avro jar(org.apache.spark.sql.avro).
Different from CSV, the package spark-avro is not loaded by default within Spark(at least as I tried spark-shell).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Okay. Then, how about this: I assume the documentation about Avro will be done to Spark for 2.4.0 soon. When it's done, we add a message here for Avro like please see the documentation to use Spark's avro package?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks same thing could happen in Kafka too already.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gengliangwang for clarifying this.

s"Failed to find data source: ${provider1.toLowerCase(Locale.ROOT)}. " +
"Please find an Avro package at " +
"http://spark.apache.org/third-party-projects.html")
} else {
throw new ClassNotFoundException(
s"Failed to find data source: $provider1. Please find packages at " +
Expand Down
16 changes: 0 additions & 16 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1689,22 +1689,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
assert(e.message.contains("Hive built-in ORC data source must be used with Hive support"))

e = intercept[AnalysisException] {
sql(s"select id from `com.databricks.spark.avro`.`file_path`")
}
assert(e.message.contains("Failed to find data source: com.databricks.spark.avro."))

// data source type is case insensitive
e = intercept[AnalysisException] {
sql(s"select id from Avro.`file_path`")
}
assert(e.message.contains("Failed to find data source: avro."))

e = intercept[AnalysisException] {
sql(s"select id from avro.`file_path`")
}
assert(e.message.contains("Failed to find data source: avro."))

e = intercept[AnalysisException] {
sql(s"select id from `org.apache.spark.sql.sources.HadoopFsRelationProvider`.`file_path`")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,9 @@ class ResolvedDataSourceSuite extends SparkFunSuite with SharedSQLContext {
}

test("error message for unknown data sources") {
val error1 = intercept[AnalysisException] {
getProvidingClass("avro")
}
assert(error1.getMessage.contains("Failed to find data source: avro."))

val error2 = intercept[AnalysisException] {
getProvidingClass("com.databricks.spark.avro")
}
assert(error2.getMessage.contains("Failed to find data source: com.databricks.spark.avro."))

val error3 = intercept[ClassNotFoundException] {
val error = intercept[ClassNotFoundException] {
getProvidingClass("asfdwefasdfasdf")
}
assert(error3.getMessage.contains("Failed to find data source: asfdwefasdfasdf."))
assert(error.getMessage.contains("Failed to find data source: asfdwefasdfasdf."))
}
}