forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 0
update #10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
update #10
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…ng Hive. A change in Hive 2.2 (most probably HIVE-13149) causes this code path to fail, since the call to "state.getConf.setClassLoader" does not actually change the context's class loader. Spark doesn't yet officially support Hive 2.2, but some distribution-specific metastore client libraries may have that change (as certain versions of CDH already do), and this also makes it easier to support 2.2 when it comes out. Tested with existing unit tests; we've also used this patch extensively with Hive metastore client jars containing the offending patch. Author: Marcelo Vanzin <[email protected]> Closes apache#17154 from vanzin/SPARK-19804.
…esn't recover the log level ## What changes were proposed in this pull request? "DataFrameCallbackSuite.execute callback functions when a DataFrame action failed" sets the log level to "fatal" but doesn't recover it. Hence, tests running after it won't output any logs except fatal logs. This PR uses `testQuietly` instead to avoid changing the log level. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes apache#17156 from zsxwing/SPARK-19816.
## What changes were proposed in this pull request? Update R document to use JDK8. ## How was this patch tested? manual tests Author: Yuming Wang <[email protected]> Closes apache#17162 from wangyum/SPARK-19550.
…Node” ,I think it is not all right Signed-off-by: liuxian <liu.xian3zte.com.cn> ## What changes were proposed in this pull request? Open the spark web page,in the Master Page ,have two tables:Running Applications table and Completed Applications table, to the column named “Memory per Node” ,I think it is not all right ,because a node may be not have only one executor.So I think that should be named as “Memory per Executor”.Otherwise easy to let the user misunderstanding ## How was this patch tested? N/A Author: liuxian <[email protected]> Closes apache#17132 from 10110346/wid-lx-0302.
## What changes were proposed in this pull request? improve the log message when query result does not match. before pr: ``` == Results == !== Correct Answer - 3 == == Spark Answer - 3 == [1] [1] [2] [2] [3] [3] ``` after pr: ~~== Results == !== Correct Answer - 3 == == Spark Answer - 3 == !RowType[string] RowType[integer] [1] [1] [2] [2] [3] [3]~~ ``` == Results == !== Correct Answer - 3 == == Spark Answer - 3 == !struct<value:string> struct<value:int> [1] [1] [2] [2] [3] [3] ``` ## How was this patch tested? Jenkins Author: uncleGen <[email protected]> Closes apache#17145 from uncleGen/improve-test-result.
## What changes were proposed in this pull request? This pr is to support Seq, Map, and Struct in functions.lit; it adds a new IF named `lit2` with `TypeTag` for avoiding type erasure. ## How was this patch tested? Added tests in `LiteralExpressionSuite` Author: Takeshi Yamamuro <[email protected]> Author: Takeshi YAMAMURO <[email protected]> Closes apache#16610 from maropu/SPARK-19254.
## What changes were proposed in this pull request? Add column functions: to_json, from_json, and tests covering error cases. ## How was this patch tested? unit tests, manual Author: Felix Cheung <[email protected]> Closes apache#17134 from felixcheung/rtojson.
Update from origin
## What changes were proposed in this pull request?
This PR proposes to both,
**Do not allow json arrays with multiple elements and return null in `from_json` with `StructType` as the schema.**
Currently, it only reads the single row when the input is a json array. So, the codes below:
```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val schema = StructType(StructField("a", IntegerType) :: Nil)
Seq(("""[{"a": 1}, {"a": 2}]""")).toDF("struct").select(from_json(col("struct"), schema)).show()
```
prints
```
+--------------------+
|jsontostruct(struct)|
+--------------------+
| [1]|
+--------------------+
```
This PR simply suggests to print this as `null` if the schema is `StructType` and input is json array.with multiple elements
```
+--------------------+
|jsontostruct(struct)|
+--------------------+
| null|
+--------------------+
```
**Support json arrays in `from_json` with `ArrayType` as the schema.**
```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
Seq(("""[{"a": 1}, {"a": 2}]""")).toDF("array").select(from_json(col("array"), schema)).show()
```
prints
```
+-------------------+
|jsontostruct(array)|
+-------------------+
| [[1], [2]]|
+-------------------+
```
## How was this patch tested?
Unit test in `JsonExpressionsSuite`, `JsonFunctionsSuite`, Python doctests and manual test.
Author: hyukjinkwon <[email protected]>
Closes apache#16929 from HyukjinKwon/disallow-array.
…n Dataframe ## What changes were proposed in this pull request? This is a simple implementation of RecommendForAllUsers & RecommendForAllItems for the Dataframe version of ALS. It uses Dataframe operations (not a wrapper on the RDD implementation). Haven't benchmarked against a wrapper, but unit test examples do work. ## How was this patch tested? Unit tests ``` $ build/sbt > mllib/testOnly *ALSSuite -- -z "recommendFor" > mllib/testOnly ``` Author: Your Name <[email protected]> Author: sueann <[email protected]> Closes apache#17090 from sueann/SPARK-19535.
…or against column
## What changes were proposed in this pull request?
This PR proposes to remove incorrect implementation that has been not executed so far (at least from Spark 1.5.2) for `in` operator and throw a correct exception rather than saying it is a bool. I tested the codes above in 1.5.2, 1.6.3, 2.1.0 and in the master branch as below:
**1.5.2**
```python
>>> df = sqlContext.createDataFrame([[1]])
>>> 1 in df._1
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File ".../spark-1.5.2-bin-hadoop2.6/python/pyspark/sql/column.py", line 418, in __nonzero__
raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
```
**1.6.3**
```python
>>> 1 in sqlContext.range(1).id
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File ".../spark-1.6.3-bin-hadoop2.6/python/pyspark/sql/column.py", line 447, in __nonzero__
raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
```
**2.1.0**
```python
>>> 1 in spark.range(1).id
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File ".../spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/column.py", line 426, in __nonzero__
raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
```
**Current Master**
```python
>>> 1 in spark.range(1).id
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File ".../spark/python/pyspark/sql/column.py", line 452, in __nonzero__
raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
```
**After**
```python
>>> 1 in spark.range(1).id
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File ".../spark/python/pyspark/sql/column.py", line 184, in __contains__
raise ValueError("Cannot apply 'in' operator against a column: please use 'contains' "
ValueError: Cannot apply 'in' operator against a column: please use 'contains' in a string column or 'array_contains' function for an array column.
```
In more details,
It seems the implementation intended to support this
```python
1 in df.column
```
However, currently, it throws an exception as below:
```python
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File ".../spark/python/pyspark/sql/column.py", line 426, in __nonzero__
raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
```
What happens here is as below:
```python
class Column(object):
def __contains__(self, item):
print "I am contains"
return Column()
def __nonzero__(self):
raise Exception("I am nonzero.")
>>> 1 in Column()
I am contains
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 6, in __nonzero__
Exception: I am nonzero.
```
It seems it calls `__contains__` first and then `__nonzero__` or `__bool__` is being called against `Column()` to make this a bool (or int to be specific).
It seems `__nonzero__` (for Python 2), `__bool__` (for Python 3) and `__contains__` forcing the the return into a bool unlike other operators. There are few references about this as below:
https://bugs.python.org/issue16011
http://stackoverflow.com/questions/12244074/python-source-code-for-built-in-in-operator/12244378#12244378
http://stackoverflow.com/questions/38542543/functionality-of-python-in-vs-contains/38542777
It seems we can't overwrite `__nonzero__` or `__bool__` as a workaround to make this working because these force the return type as a bool as below:
```python
class Column(object):
def __contains__(self, item):
print "I am contains"
return Column()
def __nonzero__(self):
return "a"
>>> 1 in Column()
I am contains
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: __nonzero__ should return bool or int, returned str
```
## How was this patch tested?
Added unit tests in `tests.py`.
Author: hyukjinkwon <[email protected]>
Closes apache#17160 from HyukjinKwon/SPARK-19701.
… not filter checkpointFilesOfLatestTime with the PATH string. ## What changes were proposed in this pull request? https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73800/testReport/ ``` sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 617 times over 10.003740484 seconds. Last failure message: 8 did not equal 2. at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420) at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438) at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478) at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:336) at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478) at org.apache.spark.streaming.DStreamCheckpointTester$class.generateOutput(CheckpointSuite .scala:172) at org.apache.spark.streaming.CheckpointSuite.generateOutput(CheckpointSuite.scala:211) ``` the check condition is: ``` val checkpointFilesOfLatestTime = Checkpoint.getCheckpointFiles(checkpointDir).filter { _.toString.contains(clock.getTimeMillis.toString) } // Checkpoint files are written twice for every batch interval. So assert that both // are written to make sure that both of them have been written. assert(checkpointFilesOfLatestTime.size === 2) ``` the path string may contain the `clock.getTimeMillis.toString`, like `3500` : ``` file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-500 file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-1000 file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-1500 file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-2000 file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-2500 file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3000 file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3500.bk file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3500 ▲▲▲▲ ``` so we should only check the filename, but not the whole path. ## How was this patch tested? Jenkins. Author: uncleGen <[email protected]> Closes apache#17167 from uncleGen/flaky-CheckpointSuite.
## What changes were proposed in this pull request? Hive hash to support Decimal datatype. [Hive internally normalises decimals](https://github.com/apache/hive/blob/4ba713ccd85c3706d195aeef9476e6e6363f1c21/storage-api/src/java/org/apache/hadoop/hive/common/type/HiveDecimalV1.java#L307) and I have ported that logic as-is to HiveHash. ## How was this patch tested? Added unit tests Author: Tejas Patil <[email protected]> Closes apache#17056 from tejasapatil/SPARK-17495_decimal.
…ions without relying on relation resolution ## What changes were proposed in this pull request? This PR adds a new `Once` analysis rule batch consists of a single analysis rule `LookupFunctions` that performs simple existence check over `UnresolvedFunctions` without actually resolving them. The benefit of this rule is that it doesn't require function arguments to be resolved first and therefore doesn't rely on relation resolution, which may incur potentially expensive partition/schema discovery cost. Please refer to [SPARK-19737][1] for more details about the motivation. ## How was this patch tested? New test case added in `AnalysisErrorSuite`. [1]: https://issues.apache.org/jira/browse/SPARK-19737 Author: Cheng Lian <[email protected]> Closes apache#17168 from liancheng/spark-19737-lookup-functions.
## What changes were proposed in this pull request? added a limit to getRecords api call call in KinesisBackedBlockRdd. This helps reduce the amount of data returned by kinesis api call making the recovery considerably faster As we are storing the `fromSeqNum` & `toSeqNum` in checkpoint metadata, we can also store the number of records. Which can later be used for api call. ## How was this patch tested? The patch was manually tested Apologies for any silly mistakes, opening first pull request Author: Gaurav <[email protected]> Closes apache#16842 from Gauravshah/kinesis_checkpoint_recovery_fix_2_1_0.
…va.net.URI
## What changes were proposed in this pull request?
Currently we treat the location of table/partition/database as URI string.
It will be safer if we can make the type of location as java.net.URI.
In this PR, there are following classes changes:
**1. CatalogDatabase**
```
case class CatalogDatabase(
name: String,
description: String,
locationUri: String,
properties: Map[String, String])
--->
case class CatalogDatabase(
name: String,
description: String,
locationUri: URI,
properties: Map[String, String])
```
**2. CatalogStorageFormat**
```
case class CatalogStorageFormat(
locationUri: Option[String],
inputFormat: Option[String],
outputFormat: Option[String],
serde: Option[String],
compressed: Boolean,
properties: Map[String, String])
---->
case class CatalogStorageFormat(
locationUri: Option[URI],
inputFormat: Option[String],
outputFormat: Option[String],
serde: Option[String],
compressed: Boolean,
properties: Map[String, String])
```
Before and After this PR, it is transparent for user, there is no change that the user should concern. The `String` to `URI` just happened in SparkSQL internally.
Here list some operation related location:
**1. whitespace in the location**
e.g. `/a/b c/d`
For both table location and partition location,
After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b c/d'` ,
then `DESC EXTENDED t ` show the location is `/a/b c/d`,
and the real path in the FileSystem also show `/a/b c/d`
**2. colon(:) in the location**
e.g. `/a/b:c/d`
For both table location and partition location,
when `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b:c/d'` ,
**In linux file system**
`DESC EXTENDED t ` show the location is `/a/b:c/d`,
and the real path in the FileSystem also show `/a/b:c/d`
**in HDFS** throw exception:
`java.lang.IllegalArgumentException: Pathname /a/b:c/d from hdfs://iZbp1151s8hbnnwriekxdeZ:9000/a/b:c/d is not a valid DFS filename.`
**while** After `INSERT INTO TABLE t PARTITION(a="a:b") SELECT 1`
then `DESC EXTENDED t ` show the location is `/xxx/a=a%3Ab`,
and the real path in the FileSystem also show `/xxx/a=a%3Ab`
**3. percent sign(%) in the location**
e.g. `/a/b%c/d`
For both table location and partition location,
After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b%c/d'` ,
then `DESC EXTENDED t ` show the location is `/a/b%c/d`,
and the real path in the FileSystem also show `/a/b%c/d`
**4. encoded(%25) in the location**
e.g. `/a/b%25c/d`
For both table location and partition location,
After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b%25c/d'` ,
then `DESC EXTENDED t ` show the location is `/a/b%25c/d`,
and the real path in the FileSystem also show `/a/b%25c/d`
**while** After `INSERT INTO TABLE t PARTITION(a="%25") SELECT 1`
then `DESC EXTENDED t ` show the location is `/xxx/a=%2525`,
and the real path in the FileSystem also show `/xxx/a=%2525`
**Additionally**, except the location, there are two other factors will affect the location of the table/partition. one is the table name which does not allowed to have special characters, and the other is `partition name` which have the same actions with `partition value`, and `partition name` with special character situation has add some testcase and resolve a bug in [PR](apache#17173)
### Summary:
After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION path`,
the path which we get from `DESC TABLE` and `real path in FileSystem` are all the same with the `CREATE TABLE` command(different filesystem has different action that allow what kind of special character to create the path, e.g. HDFS does not allow colon, but linux filesystem allow it ).
`DataBase` also have the same logic with `CREATE TABLE`
while if the `partition value` has some special character like `%` `:` `#` etc, then we will get the path with encoded `partition value` like `/xxx/a=A%25B` from `DESC TABLE` and `real path in FileSystem`
In this PR, the core change code is using `new Path(str).toUri` and `new Path(uri).toString`
which transfrom `str to uri `or `uri to str`.
for example:
```
val str = '/a/b c/d'
val uri = new Path(str).toUri --> '/a/b%20c/d'
val strFromUri = new Path(uri).toString -> '/a/b c/d'
```
when we restore table/partition from metastore, or get the location from `CREATE TABLE` command, we can use it as above to change string to uri `new Path(str).toUri `
## How was this patch tested?
unit test added.
The `current master branch` also `passed all the test cases` added in this PR by a litter change.
https://github.com/apache/spark/pull/17149/files#diff-b7094baa12601424a5d19cb930e3402fR1764
here `toURI` -> `toString` when test in master branch.
This can show that this PR is transparent for user.
Author: windpiger <[email protected]>
Closes apache#17149 from windpiger/changeStringToURI.
…Description ## What changes were proposed in this pull request? The properties that are serialized with a TaskDescription can have very long values (eg. "spark.job.description" which is set to the full sql statement with the thrift-server). DataOutputStream.writeUTF() does not work well for long strings, so this changes the way those values are serialized to handle longer strings. ## How was this patch tested? Updated existing unit test to reproduce the issue. All unit tests via jenkins. Author: Imran Rashid <[email protected]> Closes apache#17140 from squito/SPARK-19796.
… As Insert
## What changes were proposed in this pull request?
Currently we don't explicitly forbid the following behaviors:
1. The statement CREATE VIEW AS INSERT INTO throws the following exception:
```
scala> spark.sql("CREATE VIEW testView AS INSERT INTO tab VALUES (1, \"a\")")
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.hive.ql.metadata.HiveException: at least one column must be specified for the table;
scala> spark.sql("CREATE VIEW testView(a, b) AS INSERT INTO tab VALUES (1, \"a\")")
org.apache.spark.sql.AnalysisException: The number of columns produced by the SELECT clause (num: `0`) does not match the number of column names specified by CREATE VIEW (num: `2`).;
```
2. The statement INSERT INTO view VALUES throws the following exception from checkAnalysis:
```
scala> spark.sql("INSERT INTO testView VALUES (1, \"a\")")
org.apache.spark.sql.AnalysisException: Inserting into an RDD-based table is not allowed.;;
'InsertIntoTable View (`default`.`testView`, [a#16,b#17]), false, false
+- LocalRelation [col1#14, col2#15]
```
After this PR, the behavior changes to:
```
scala> spark.sql("CREATE VIEW testView AS INSERT INTO tab VALUES (1, \"a\")")
org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: CREATE VIEW ... AS INSERT INTO;
scala> spark.sql("CREATE VIEW testView(a, b) AS INSERT INTO tab VALUES (1, \"a\")")
org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: CREATE VIEW ... AS INSERT INTO;
scala> spark.sql("INSERT INTO testView VALUES (1, \"a\")")
org.apache.spark.sql.AnalysisException: `default`.`testView` is a view, inserting into a view is not allowed;
```
## How was this patch tested?
Add a new test case in `SparkSqlParserSuite`;
Update the corresponding test case in `SQLViewSuite`.
Author: jiangxingbo <[email protected]>
Closes apache#17125 from jiangxb1987/insert-with-view.
## What changes were proposed in this pull request?
Add unit tests for testing SparseVector.
We can't add mixed DenseVector and SparseVector test case, as discussed in JIRA 19382.
def merge(other: MultivariateOnlineSummarizer): this.type = {
if (this.totalWeightSum != 0.0 && other.totalWeightSum != 0.0) {
require(n == other.n, s"Dimensions mismatch when merging with another summarizer. " +
s"Expecting $n but got $
{other.n}
.")
## How was this patch tested?
Unit tests
Author: [email protected] <[email protected]>
Author: Miao Wang <[email protected]>
Closes apache#16784 from wangmiao1981/bk.
## What changes were proposed in this pull request? Bugfix for reading empty file with CSV data source. Instead of throwing `NoSuchElementException`, an empty data frame is returned. ## How was this patch tested? Added new unit test in `org.apache.spark.sql.execution.datasources.csv.CSVSuite` Author: Wojtek Szymanski <[email protected]> Closes apache#17068 from wojtek-szymanski/SPARK-19709.
…h queires
## What changes were proposed in this pull request?
Add a new Kafka Sink and Kafka Relation for writing streaming and batch queries, respectively, to Apache Kafka.
### Streaming Kafka Sink
- When addBatch is called
-- If batchId is great than the last written batch
--- Write batch to Kafka
---- Topic will be taken from the record, if present, or from a topic option, which overrides topic in record.
-- Else ignore
### Batch Kafka Sink
- KafkaSourceProvider will implement CreatableRelationProvider
- CreatableRelationProvider#createRelation will write the passed in Dataframe to a Kafka
- Topic will be taken from the record, if present, or from topic option, which overrides topic in record.
- Save modes Append and ErrorIfExist supported under identical semantics. Other save modes result in an AnalysisException
tdas zsxwing
## How was this patch tested?
### The following unit tests will be included
- write to stream with topic field: valid stream write with data that includes an existing topic in the schema
- write structured streaming aggregation w/o topic field, with default topic: valid stream write with data that does not include a topic field, but the configuration includes a default topic
- write data with bad schema: various cases of writing data that does not conform to a proper schema e.g., 1. no topic field or default topic, and 2. no value field
- write data with valid schema but wrong types: data with a complete schema but wrong types e.g., key and value types are integers.
- write to non-existing topic: write a stream to a topic that does not exist in Kafka, which has been configured to not auto-create topics.
- write batch to kafka: simple write batch to Kafka, which goes through the same code path as streaming scenario, so validity checks will not be redone here.
### Examples
```scala
// Structured Streaming
val writer = inputStringStream.map(s => s.get(0).toString.getBytes()).toDF("value")
.selectExpr("value as key", "value as value")
.writeStream
.format("kafka")
.option("checkpointLocation", checkpointDir)
.outputMode(OutputMode.Append)
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", topic)
.queryName("kafkaStream")
.start()
// Batch
val df = spark
.sparkContext
.parallelize(Seq("1", "2", "3", "4", "5"))
.map(v => (topic, v))
.toDF("topic", "value")
df.write
.format("kafka")
.option("kafka.bootstrap.servers",brokerAddress)
.option("topic", topic)
.save()
```
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Tyson Condie <[email protected]>
Closes apache#17043 from tcondie/kafka-writer.
## What changes were proposed in this pull request? Before this pr, LocalLimit/GlobalLimit/Sample propagates the same row count and column stats from its child, which is incorrect. We can get the correct rowCount in Statistics for GlobalLimit/Sample whether cbo is enabled or not. We don't know the rowCount for LocalLimit because we don't know the partition number at that time. Column stats should not be propagated because we don't know the distribution of columns after Limit or Sample. ## How was this patch tested? Added test cases. Author: wangzhenhua <[email protected]> Closes apache#16696 from wzhfy/limitEstimation.
…t data frames ## What changes were proposed in this pull request? Added checks for name consistency of input data frames in union. ## How was this patch tested? new test. Author: actuaryzhang <[email protected]> Closes apache#17159 from actuaryzhang/sparkRUnion.
… escape the partition name
## What changes were proposed in this pull request?
Currently in DynamicPartitionWriteTask, when we get the paritionPath of a parition, we just escape the partition value, not escape the partition name.
this will cause some problems for some special partition name situation, for example :
1) if the partition name contains '%' etc, there will be two partition path created in the filesytem, one is for escaped path like '/path/a%25b=1', another is for unescaped path like '/path/a%b=1'.
and the data inserted stored in unescaped path, while the show partitions table will return 'a%25b=1' which the partition name is escaped. So here it is not consist. And I think the data should be stored in the escaped path in filesystem, which Hive2.0.0 also have the same action.
2) if the partition name contains ':', there will throw exception that new Path("/path","a:b"), this is illegal which has a colon in the relative path.
```
java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: a:b
at org.apache.hadoop.fs.Path.initialize(Path.java:205)
at org.apache.hadoop.fs.Path.<init>(Path.java:171)
at org.apache.hadoop.fs.Path.<init>(Path.java:88)
... 48 elided
Caused by: java.net.URISyntaxException: Relative path in absolute URI: a:b
at java.net.URI.checkPath(URI.java:1823)
at java.net.URI.<init>(URI.java:745)
at org.apache.hadoop.fs.Path.initialize(Path.java:202)
... 50 more
```
## How was this patch tested?
unit test added
Author: windpiger <[email protected]>
Closes apache#17173 from windpiger/fixDatasourceSpecialCharPartitionName.
## What changes were proposed in this pull request? 1. support boolean type in binary expression estimation. 2. deal with compound Not conditions. 3. avoid convert BigInt/BigDecimal directly to double unless it's within range (0, 1). 4. reorganize test code. ## How was this patch tested? modify related test cases. Author: wangzhenhua <[email protected]> Author: Zhenhua Wang <[email protected]> Closes apache#17148 from wzhfy/fixFilter.
## What changes were proposed in this pull request? This pr added entries in `FunctionRegistry` and supported `to_json` in SQL. ## How was this patch tested? Added tests in `JsonFunctionsSuite`. Author: Takeshi Yamamuro <[email protected]> Closes apache#16981 from maropu/SPARK-19637.
…hed plans that refer to this table ## What changes were proposed in this pull request? When un-cache a table, we should not only remove the cache entry for this table, but also un-cache any other cached plans that refer to this table. This PR also includes some refactors: 1. use `java.util.LinkedList` to store the cache entries, so that it's safer to remove elements while iterating 2. rename `invalidateCache` to `recacheByPlan`, which is more obvious about what it does. ## How was this patch tested? new regression test Author: Wenchen Fan <[email protected]> Closes apache#17097 from cloud-fan/cache.
## What changes were proposed in this pull request?
This PR is an enhancement to ML StringIndexer.
Before this PR, String Indexer only supports "skip"/"error" options to deal with unseen records.
But those unseen records might still be useful and user would like to keep the unseen labels in
certain use cases, This PR enables StringIndexer to support keeping unseen labels as
indices [numLabels].
'''Before
StringIndexer().setHandleInvalid("skip")
StringIndexer().setHandleInvalid("error")
'''After
support the third option "keep"
StringIndexer().setHandleInvalid("keep")
## How was this patch tested?
Test added in StringIndexerSuite
Signed-off-by: VinceShieh <vincent.xieintel.com>
(Please fill in changes proposed in this fix)
Author: VinceShieh <[email protected]>
Closes apache#16883 from VinceShieh/spark-17498.
…parkContext ## What changes were proposed in this pull request? After Spark 2.0, `SparkSession` becomes the new entry point for Spark applications. We should update the public documents to reflect this. ## How was this patch tested? N/A Author: Wenchen Fan <[email protected]> Closes apache#16856 from cloud-fan/doc.
## What changes were proposed in this pull request? 200ms may be too short. Give more time for replication to happen and new block be reported to master ## How was this patch tested? test manully Author: uncleGen <[email protected]> Author: dylon <[email protected]> Closes apache#17144 from uncleGen/SPARK-19803.
… in combination with maxFileAge in FileStreamSource ## What changes were proposed in this pull request? **The Problem** There is a file stream source option called maxFileAge which limits how old the files can be, relative the latest file that has been seen. This is used to limit the files that need to be remembered as "processed". Files older than the latest processed files are ignored. This values is by default 7 days. This causes a problem when both latestFirst = true maxFilesPerTrigger > total files to be processed. Here is what happens in all combinations 1) latestFirst = false - Since files are processed in order, there wont be any unprocessed file older than the latest processed file. All files will be processed. 2) latestFirst = true AND maxFilesPerTrigger is not set - The maxFileAge thresholding mechanism takes one batch initialize. If maxFilesPerTrigger is not, then all old files get processed in the first batch, and so no file is left behind. 3) latestFirst = true AND maxFilesPerTrigger is set to X - The first batch process the latest X files. That sets the threshold latest file - maxFileAge, so files older than this threshold will never be considered for processing. The bug is with case 3. **The Solution** Ignore `maxFileAge` when both `maxFilesPerTrigger` and `latestFirst` are set. ## How was this patch tested? Regression test in `FileStreamSourceSuite` Author: Burak Yavuz <[email protected]> Closes apache#17153 from brkyvz/maxFileAge.
…isException is raised from analyzer. ## What changes were proposed in this pull request? In general we have a checkAnalysis phase which validates the logical plan and throws AnalysisException on semantic errors. However we also can throw AnalysisException from a few analyzer rules like ResolveSubquery. I found that we fire up the analyzer rules twice for the queries that throw AnalysisException from one of the analyzer rules. This is a very minor fix. We don't have to strictly fix it. I just got confused seeing the rule getting fired two times when i was not expecting it. ## How was this patch tested? Tested manually. Author: Dilip Biswal <[email protected]> Closes apache#17214 from dilipbiswal/analyis_twice.
…astore
### What changes were proposed in this pull request?
So far, the test cases in DDLSuites only verify the behaviors of InMemoryCatalog. That means, they do not cover the scenarios using HiveExternalCatalog. Thus, we need to improve the existing test suite to run these cases using Hive metastore.
When porting these test cases, a bug of `SET LOCATION` is found. `path` is not set when the location is changed.
After this PR, a few changes are made, as summarized below,
- `DDLSuite` becomes an abstract class. Both `InMemoryCatalogedDDLSuite` and `HiveCatalogedDDLSuite` extend it. `InMemoryCatalogedDDLSuite` is using `InMemoryCatalog`. `HiveCatalogedDDLSuite` is using `HiveExternalCatalog`.
- `InMemoryCatalogedDDLSuite` contains all the existing test cases in `DDLSuite`.
- `HiveCatalogedDDLSuite` contains a subset of `DDLSuite`. The following test cases are excluded:
1. The following test cases only make sense for `InMemoryCatalog`:
```
test("desc table for parquet data source table using in-memory catalog")
test("create a managed Hive source table") {
test("create an external Hive source table")
test("Create Hive Table As Select")
```
2. The following test cases are unable to be ported because we are unable to alter table provider when using Hive metastore. In the future PRs we need to improve the test cases so that altering table provider is not needed:
```
test("alter table: set location (datasource table)")
test("alter table: set properties (datasource table)")
test("alter table: unset properties (datasource table)")
test("alter table: set serde (datasource table)")
test("alter table: set serde partition (datasource table)")
test("alter table: change column (datasource table)")
test("alter table: add partition (datasource table)")
test("alter table: drop partition (datasource table)")
test("alter table: rename partition (datasource table)")
test("drop table - data source table")
```
**TODO** : in the future PRs, we need to remove `HiveDDLSuite` and move the test cases to either `DDLSuite`, `InMemoryCatalogedDDLSuite` or `HiveCatalogedDDLSuite`.
### How was this patch tested?
N/A
Author: Xiao Li <[email protected]>
Author: gatorsmile <[email protected]>
Closes apache#16592 from gatorsmile/refactorDDLSuite.
## What changes were proposed in this pull request? The API docs should not include the "org.apache.spark.sql.internal" package because they are internal private APIs. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes apache#17217 from zsxwing/SPARK-19874.
…d one. ## What changes were proposed in this pull request? A follow up to SPARK-19859: - extract the calculation of `delayMs` and reuse it. - update EventTimeWatermarkExec - use the correct `delayMs` in EventTimeWatermark ## How was this patch tested? Jenkins. Author: uncleGen <[email protected]> Closes apache#17221 from uncleGen/SPARK-19859.
…d in catalog ## What changes were proposed in this pull request? If we create a external datasource table with a non-qualified location , we should qualified it to store in catalog. ``` CREATE TABLE t(a string) USING parquet LOCATION '/path/xx' CREATE TABLE t1(a string, b string) USING parquet PARTITIONED BY(b) LOCATION '/path/xx' ``` when we get the table from catalog, the location should be qualified, e.g.'file:/path/xxx' ## How was this patch tested? unit test added Author: windpiger <[email protected]> Closes apache#17095 from windpiger/tablepathQualified.
## What changes were proposed in this pull request? Add handling of input of type `Int` for dataType `TimestampType` to `EvaluatePython.scala`. Py4J serializes ints smaller than MIN_INT or larger than MAX_INT to Long, which are handled correctly already, but values between MIN_INT and MAX_INT are serialized to Int. These range limits correspond to roughly half an hour on either side of the epoch. As a result, PySpark doesn't allow TimestampType values to be created in this range. Alternatives attempted: patching the `TimestampType.toInternal` function to cast return values to `long`, so Py4J would always serialize them to Scala Long. Python3 does not have a `long` type, so this approach failed on Python3. ## How was this patch tested? Added a new PySpark-side test that fails without the change. The contribution is my original work and I license the work to the project under the project’s open source license. Resubmission of apache#16896. The original PR didn't go through Jenkins and broke the build. davies dongjoon-hyun cloud-fan Could you kick off a Jenkins run for me? It passed everything for me locally, but it's possible something has changed in the last few weeks. Author: Jason White <[email protected]> Closes apache#17200 from JasonMWhite/SPARK-19561.
…nedSchedulerBackend#killExecutors ## What changes were proposed in this pull request? While some executors are being killed due to idleness, if some new tasks come in, driver could assign them to some executors are being killed. These tasks will fail later when the executors are lost. This patch is to make sure CoarseGrainedSchedulerBackend#killExecutors and DriverEndpoint#makeOffers are properly synchronized. ## How was this patch tested? manual tests Author: Jimmy Xiang <[email protected]> Closes apache#17091 from jxiang/spark-19757.
…askSetManager. ## What changes were proposed in this pull request? TaskSetManager is now using `System.getCurrentTimeMillis` when mark task as finished in `handleSuccessfulTask` and `handleFailedTask`. Thus developer cannot set the tasks finishing time in unit test. When `handleSuccessfulTask`, task's duration = `System.getCurrentTimeMillis` - launchTime(which can be set by `clock`), the result is not correct. ## How was this patch tested? Existing tests. Author: jinxing <[email protected]> Closes apache#17133 from jinxing64/SPARK-19793.
## What changes were proposed in this pull request?
Today, we compare the whole path when deciding if a file is new in the FileSource for structured streaming. However, this would cause false negatives in the case where the path has changed in a cosmetic way (i.e. changing `s3n` to `s3a`).
This patch adds an option `fileNameOnly` that causes the new file check to be based only on the filename (but still store the whole path in the log).
## Usage
```scala
spark
.readStream
.option("fileNameOnly", true)
.text("s3n://bucket/dir1/dir2")
.writeStream
...
```
## How was this patch tested?
Added a test case
Author: Liwei Lin <[email protected]>
Closes apache#17120 from lw-lin/filename-only.
## What changes were proposed in this pull request? `watermark` should not be negative. This behavior is invalid, check it before real run. ## How was this patch tested? add new unit test. Author: uncleGen <[email protected]> Author: dylon <[email protected]> Closes apache#17202 from uncleGen/SPARK-19861.
…r orc file in DataFrameReader.orc Beside the issue in spark api, also fix 2 minor issues in pyspark - support read from multiple input paths for orc - support read from multiple input paths for text Author: Jeff Zhang <[email protected]> Closes apache#10307 from zjffdu/SPARK-12334.
## Summary of changes Add a new configuration option that allows Spark SQL to infer a case-sensitive schema from a Hive Metastore table's data files when a case-sensitive schema can't be read from the table properties. - Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf - Add schemaPreservesCase field to CatalogTable (set to false when schema can't successfully be read from Hive table props) - Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is false, depending on spark.sql.hive.caseSensitiveInferenceMode - Add alterTableSchema() method to the ExternalCatalog interface - Add HiveSchemaInferenceSuite tests - Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as HiveMetastoreCatalog.mergeWithMetastoreSchema - Move schema merging tests from ParquetSchemaSuite to HiveSchemaInferenceSuite [JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611) ## How was this patch tested? The tests in ```HiveSchemaInferenceSuite``` should verify that schema inference is working as expected. ```ExternalCatalogSuite``` has also been extended to cover the new ```alterTableSchema()``` API. Author: Budde <[email protected]> Closes apache#16944 from budde/SPARK-19611.
## What changes were proposed in this pull request? Fix the `throw new IllegalStateException` if statement part. ## How is this patch tested Regression test Author: Burak Yavuz <[email protected]> Closes apache#17228 from brkyvz/kafka-cause-fix.
…boxing/unboxing ## What changes were proposed in this pull request? This PR improve performance of Dataset.map() for primitive types by removing boxing/unbox operations. This is based on [the discussion](apache#16391 (comment)) with cloud-fan. Current Catalyst generates a method call to a `apply()` method of an anonymous function written in Scala. The types of an argument and return value are `java.lang.Object`. As a result, each method call for a primitive value involves a pair of unboxing and boxing for calling this `apply()` method and a pair of boxing and unboxing for returning from this `apply()` method. This PR directly calls a specialized version of a `apply()` method without boxing and unboxing. For example, if types of an arguments ant return value is `int`, this PR generates a method call to `apply$mcII$sp`. This PR supports any combination of `Int`, `Long`, `Float`, and `Double`. The following is a benchmark result using [this program](https://github.com/apache/spark/pull/16391/files) with 4.7x. Here is a Dataset part of this program. Without this PR ``` OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ RDD 1923 / 1952 52.0 19.2 1.0X DataFrame 526 / 548 190.2 5.3 3.7X Dataset 3094 / 3154 32.3 30.9 0.6X ``` With this PR ``` OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ RDD 1883 / 1892 53.1 18.8 1.0X DataFrame 502 / 642 199.1 5.0 3.7X Dataset 657 / 784 152.2 6.6 2.9X ``` ```java def backToBackMap(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = { import spark.implicits._ val rdd = spark.sparkContext.range(0, numRows) val ds = spark.range(0, numRows) val func = (l: Long) => l + 1 val benchmark = new Benchmark("back-to-back map", numRows) ... benchmark.addCase("Dataset") { iter => var res = ds.as[Long] var i = 0 while (i < numChains) { res = res.map(func) i += 1 } res.queryExecution.toRdd.foreach(_ => Unit) } benchmark } ``` A motivating example ```java Seq(1, 2, 3).toDS.map(i => i * 7).show ``` Generated code without this PR ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private UnsafeRow deserializetoobject_result; /* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder; /* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter; /* 012 */ private int mapelements_argValue; /* 013 */ private UnsafeRow mapelements_result; /* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder; /* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter; /* 016 */ private UnsafeRow serializefromobject_result; /* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 018 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 019 */ /* 020 */ public GeneratedIterator(Object[] references) { /* 021 */ this.references = references; /* 022 */ } /* 023 */ /* 024 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 025 */ partitionIndex = index; /* 026 */ this.inputs = inputs; /* 027 */ inputadapter_input = inputs[0]; /* 028 */ deserializetoobject_result = new UnsafeRow(1); /* 029 */ this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 0); /* 030 */ this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1); /* 031 */ /* 032 */ mapelements_result = new UnsafeRow(1); /* 033 */ this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 0); /* 034 */ this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1); /* 035 */ serializefromobject_result = new UnsafeRow(1); /* 036 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0); /* 037 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 038 */ /* 039 */ } /* 040 */ /* 041 */ protected void processNext() throws java.io.IOException { /* 042 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 043 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 044 */ int inputadapter_value = inputadapter_row.getInt(0); /* 045 */ /* 046 */ boolean mapelements_isNull = true; /* 047 */ int mapelements_value = -1; /* 048 */ if (!false) { /* 049 */ mapelements_argValue = inputadapter_value; /* 050 */ /* 051 */ mapelements_isNull = false; /* 052 */ if (!mapelements_isNull) { /* 053 */ Object mapelements_funcResult = null; /* 054 */ mapelements_funcResult = ((scala.Function1) references[0]).apply(mapelements_argValue); /* 055 */ if (mapelements_funcResult == null) { /* 056 */ mapelements_isNull = true; /* 057 */ } else { /* 058 */ mapelements_value = (Integer) mapelements_funcResult; /* 059 */ } /* 060 */ /* 061 */ } /* 062 */ /* 063 */ } /* 064 */ /* 065 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 066 */ /* 067 */ if (mapelements_isNull) { /* 068 */ serializefromobject_rowWriter.setNullAt(0); /* 069 */ } else { /* 070 */ serializefromobject_rowWriter.write(0, mapelements_value); /* 071 */ } /* 072 */ append(serializefromobject_result); /* 073 */ if (shouldStop()) return; /* 074 */ } /* 075 */ } /* 076 */ } ``` Generated code with this PR (lines 48-56 are changed) ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private UnsafeRow deserializetoobject_result; /* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder; /* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter; /* 012 */ private int mapelements_argValue; /* 013 */ private UnsafeRow mapelements_result; /* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder; /* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter; /* 016 */ private UnsafeRow serializefromobject_result; /* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 018 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 019 */ /* 020 */ public GeneratedIterator(Object[] references) { /* 021 */ this.references = references; /* 022 */ } /* 023 */ /* 024 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 025 */ partitionIndex = index; /* 026 */ this.inputs = inputs; /* 027 */ inputadapter_input = inputs[0]; /* 028 */ deserializetoobject_result = new UnsafeRow(1); /* 029 */ this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 0); /* 030 */ this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1); /* 031 */ /* 032 */ mapelements_result = new UnsafeRow(1); /* 033 */ this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 0); /* 034 */ this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1); /* 035 */ serializefromobject_result = new UnsafeRow(1); /* 036 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0); /* 037 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 038 */ /* 039 */ } /* 040 */ /* 041 */ protected void processNext() throws java.io.IOException { /* 042 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 043 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 044 */ int inputadapter_value = inputadapter_row.getInt(0); /* 045 */ /* 046 */ boolean mapelements_isNull = true; /* 047 */ int mapelements_value = -1; /* 048 */ if (!false) { /* 049 */ mapelements_argValue = inputadapter_value; /* 050 */ /* 051 */ mapelements_isNull = false; /* 052 */ if (!mapelements_isNull) { /* 053 */ mapelements_value = ((scala.Function1) references[0]).apply$mcII$sp(mapelements_argValue); /* 054 */ } /* 055 */ /* 056 */ } /* 057 */ /* 058 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 059 */ /* 060 */ if (mapelements_isNull) { /* 061 */ serializefromobject_rowWriter.setNullAt(0); /* 062 */ } else { /* 063 */ serializefromobject_rowWriter.write(0, mapelements_value); /* 064 */ } /* 065 */ append(serializefromobject_result); /* 066 */ if (shouldStop()) return; /* 067 */ } /* 068 */ } /* 069 */ } ``` Java bytecode for methods for `i => i * 7` ```java $ javap -c Test\$\$anonfun\$5\$\$anonfun\$apply\$mcV\$sp\$1.class Compiled from "Test.scala" public final class org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1 extends scala.runtime.AbstractFunction1$mcII$sp implements scala.Serializable { public static final long serialVersionUID; public final int apply(int); Code: 0: aload_0 1: iload_1 2: invokevirtual #18 // Method apply$mcII$sp:(I)I 5: ireturn public int apply$mcII$sp(int); Code: 0: iload_1 1: bipush 7 3: imul 4: ireturn public final java.lang.Object apply(java.lang.Object); Code: 0: aload_0 1: aload_1 2: invokestatic apache#29 // Method scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I 5: invokevirtual apache#31 // Method apply:(I)I 8: invokestatic apache#35 // Method scala/runtime/BoxesRunTime.boxToInteger:(I)Ljava/lang/Integer; 11: areturn public org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1(org.apache.spark.sql.Test$$anonfun$5); Code: 0: aload_0 1: invokespecial apache#42 // Method scala/runtime/AbstractFunction1$mcII$sp."<init>":()V 4: return } ``` ## How was this patch tested? Added new test suites to `DatasetPrimitiveSuite`. Author: Kazuaki Ishizaki <[email protected]> Closes apache#17172 from kiszk/SPARK-19008.
## What changes were proposed in this pull request? We need to notify the await batch lock when the stream exits early e.g., when an exception has been thrown. ## How was this patch tested? Current tests that throw exceptions at runtime will finish faster as a result of this update. zsxwing Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson Condie <[email protected]> Closes apache#17231 from tcondie/kafka-writer.
…garding range() ## What changes were proposed in this pull request? This PR improves performance of operations with `range()` by changing Java code generated by Catalyst. This PR is inspired by the [blog article](https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html). This PR changes generated code in the following two points. 1. Replace a while-loop with long instance variables a for-loop with int local varibles 2. Suppress generation of `shouldStop()` method if this method is unnecessary (e.g. `append()` is not generated). These points facilitates compiler optimizations in a JIT compiler by feeding the simplified Java code into the JIT compiler. The performance is improved by 7.6x. Benchmark program: ```java val N = 1 << 29 val iters = 2 val benchmark = new Benchmark("range.count", N * iters) benchmark.addCase(s"with this PR") { i => var n = 0 var len = 0 while (n < iters) { len += sparkSession.range(N).selectExpr("count(id)").collect.length n += 1 } } benchmark.run ``` Performance result without this PR ``` OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz range.count: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ w/o this PR 1349 / 1356 796.2 1.3 1.0X ``` Performance result with this PR ``` OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz range.count: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ with this PR 177 / 271 6065.3 0.2 1.0X ``` Here is a comparison between generated code w/o and with this PR. Only the method ```agg_doAggregateWithoutKey``` is changed. Generated code without this PR ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private boolean agg_initAgg; /* 009 */ private boolean agg_bufIsNull; /* 010 */ private long agg_bufValue; /* 011 */ private org.apache.spark.sql.execution.metric.SQLMetric range_numOutputRows; /* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric range_numGeneratedRows; /* 013 */ private boolean range_initRange; /* 014 */ private long range_number; /* 015 */ private TaskContext range_taskContext; /* 016 */ private InputMetrics range_inputMetrics; /* 017 */ private long range_batchEnd; /* 018 */ private long range_numElementsTodo; /* 019 */ private scala.collection.Iterator range_input; /* 020 */ private UnsafeRow range_result; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder range_holder; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter range_rowWriter; /* 023 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows; /* 024 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime; /* 025 */ private UnsafeRow agg_result; /* 026 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 027 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 028 */ /* 029 */ public GeneratedIterator(Object[] references) { /* 030 */ this.references = references; /* 031 */ } /* 032 */ /* 033 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 034 */ partitionIndex = index; /* 035 */ this.inputs = inputs; /* 036 */ agg_initAgg = false; /* 037 */ /* 038 */ this.range_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0]; /* 039 */ this.range_numGeneratedRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[1]; /* 040 */ range_initRange = false; /* 041 */ range_number = 0L; /* 042 */ range_taskContext = TaskContext.get(); /* 043 */ range_inputMetrics = range_taskContext.taskMetrics().inputMetrics(); /* 044 */ range_batchEnd = 0; /* 045 */ range_numElementsTodo = 0L; /* 046 */ range_input = inputs[0]; /* 047 */ range_result = new UnsafeRow(1); /* 048 */ this.range_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(range_result, 0); /* 049 */ this.range_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_holder, 1); /* 050 */ this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2]; /* 051 */ this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3]; /* 052 */ agg_result = new UnsafeRow(1); /* 053 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0); /* 054 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1); /* 055 */ /* 056 */ } /* 057 */ /* 058 */ private void agg_doAggregateWithoutKey() throws java.io.IOException { /* 059 */ // initialize aggregation buffer /* 060 */ agg_bufIsNull = false; /* 061 */ agg_bufValue = 0L; /* 062 */ /* 063 */ // initialize Range /* 064 */ if (!range_initRange) { /* 065 */ range_initRange = true; /* 066 */ initRange(partitionIndex); /* 067 */ } /* 068 */ /* 069 */ while (true) { /* 070 */ while (range_number != range_batchEnd) { /* 071 */ long range_value = range_number; /* 072 */ range_number += 1L; /* 073 */ /* 074 */ // do aggregate /* 075 */ // common sub-expressions /* 076 */ /* 077 */ // evaluate aggregate function /* 078 */ boolean agg_isNull1 = false; /* 079 */ /* 080 */ long agg_value1 = -1L; /* 081 */ agg_value1 = agg_bufValue + 1L; /* 082 */ // update aggregation buffer /* 083 */ agg_bufIsNull = false; /* 084 */ agg_bufValue = agg_value1; /* 085 */ /* 086 */ if (shouldStop()) return; /* 087 */ } /* 088 */ /* 089 */ if (range_taskContext.isInterrupted()) { /* 090 */ throw new TaskKilledException(); /* 091 */ } /* 092 */ /* 093 */ long range_nextBatchTodo; /* 094 */ if (range_numElementsTodo > 1000L) { /* 095 */ range_nextBatchTodo = 1000L; /* 096 */ range_numElementsTodo -= 1000L; /* 097 */ } else { /* 098 */ range_nextBatchTodo = range_numElementsTodo; /* 099 */ range_numElementsTodo = 0; /* 100 */ if (range_nextBatchTodo == 0) break; /* 101 */ } /* 102 */ range_numOutputRows.add(range_nextBatchTodo); /* 103 */ range_inputMetrics.incRecordsRead(range_nextBatchTodo); /* 104 */ /* 105 */ range_batchEnd += range_nextBatchTodo * 1L; /* 106 */ } /* 107 */ /* 108 */ } /* 109 */ /* 110 */ private void initRange(int idx) { /* 111 */ java.math.BigInteger index = java.math.BigInteger.valueOf(idx); /* 112 */ java.math.BigInteger numSlice = java.math.BigInteger.valueOf(2L); /* 113 */ java.math.BigInteger numElement = java.math.BigInteger.valueOf(10000L); /* 114 */ java.math.BigInteger step = java.math.BigInteger.valueOf(1L); /* 115 */ java.math.BigInteger start = java.math.BigInteger.valueOf(0L); /* 117 */ /* 118 */ java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start); /* 119 */ if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 120 */ range_number = Long.MAX_VALUE; /* 121 */ } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 122 */ range_number = Long.MIN_VALUE; /* 123 */ } else { /* 124 */ range_number = st.longValue(); /* 125 */ } /* 126 */ range_batchEnd = range_number; /* 127 */ /* 128 */ java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice) /* 129 */ .multiply(step).add(start); /* 130 */ if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 131 */ partitionEnd = Long.MAX_VALUE; /* 132 */ } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 133 */ partitionEnd = Long.MIN_VALUE; /* 134 */ } else { /* 135 */ partitionEnd = end.longValue(); /* 136 */ } /* 137 */ /* 138 */ java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract( /* 139 */ java.math.BigInteger.valueOf(range_number)); /* 140 */ range_numElementsTodo = startToEnd.divide(step).longValue(); /* 141 */ if (range_numElementsTodo < 0) { /* 142 */ range_numElementsTodo = 0; /* 143 */ } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) { /* 144 */ range_numElementsTodo++; /* 145 */ } /* 146 */ } /* 147 */ /* 148 */ protected void processNext() throws java.io.IOException { /* 149 */ while (!agg_initAgg) { /* 150 */ agg_initAgg = true; /* 151 */ long agg_beforeAgg = System.nanoTime(); /* 152 */ agg_doAggregateWithoutKey(); /* 153 */ agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000); /* 154 */ /* 155 */ // output the result /* 156 */ /* 157 */ agg_numOutputRows.add(1); /* 158 */ agg_rowWriter.zeroOutNullBytes(); /* 159 */ /* 160 */ if (agg_bufIsNull) { /* 161 */ agg_rowWriter.setNullAt(0); /* 162 */ } else { /* 163 */ agg_rowWriter.write(0, agg_bufValue); /* 164 */ } /* 165 */ append(agg_result); /* 166 */ } /* 167 */ } /* 168 */ } ``` Generated code with this PR ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private boolean agg_initAgg; /* 009 */ private boolean agg_bufIsNull; /* 010 */ private long agg_bufValue; /* 011 */ private org.apache.spark.sql.execution.metric.SQLMetric range_numOutputRows; /* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric range_numGeneratedRows; /* 013 */ private boolean range_initRange; /* 014 */ private long range_number; /* 015 */ private TaskContext range_taskContext; /* 016 */ private InputMetrics range_inputMetrics; /* 017 */ private long range_batchEnd; /* 018 */ private long range_numElementsTodo; /* 019 */ private scala.collection.Iterator range_input; /* 020 */ private UnsafeRow range_result; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder range_holder; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter range_rowWriter; /* 023 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows; /* 024 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime; /* 025 */ private UnsafeRow agg_result; /* 026 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 027 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 028 */ /* 029 */ public GeneratedIterator(Object[] references) { /* 030 */ this.references = references; /* 031 */ } /* 032 */ /* 033 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 034 */ partitionIndex = index; /* 035 */ this.inputs = inputs; /* 036 */ agg_initAgg = false; /* 037 */ /* 038 */ this.range_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0]; /* 039 */ this.range_numGeneratedRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[1]; /* 040 */ range_initRange = false; /* 041 */ range_number = 0L; /* 042 */ range_taskContext = TaskContext.get(); /* 043 */ range_inputMetrics = range_taskContext.taskMetrics().inputMetrics(); /* 044 */ range_batchEnd = 0; /* 045 */ range_numElementsTodo = 0L; /* 046 */ range_input = inputs[0]; /* 047 */ range_result = new UnsafeRow(1); /* 048 */ this.range_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(range_result, 0); /* 049 */ this.range_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_holder, 1); /* 050 */ this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2]; /* 051 */ this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3]; /* 052 */ agg_result = new UnsafeRow(1); /* 053 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0); /* 054 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1); /* 055 */ /* 056 */ } /* 057 */ /* 058 */ private void agg_doAggregateWithoutKey() throws java.io.IOException { /* 059 */ // initialize aggregation buffer /* 060 */ agg_bufIsNull = false; /* 061 */ agg_bufValue = 0L; /* 062 */ /* 063 */ // initialize Range /* 064 */ if (!range_initRange) { /* 065 */ range_initRange = true; /* 066 */ initRange(partitionIndex); /* 067 */ } /* 068 */ /* 069 */ while (true) { /* 070 */ long range_range = range_batchEnd - range_number; /* 071 */ if (range_range != 0L) { /* 072 */ int range_localEnd = (int)(range_range / 1L); /* 073 */ for (int range_localIdx = 0; range_localIdx < range_localEnd; range_localIdx++) { /* 074 */ long range_value = ((long)range_localIdx * 1L) + range_number; /* 075 */ /* 076 */ // do aggregate /* 077 */ // common sub-expressions /* 078 */ /* 079 */ // evaluate aggregate function /* 080 */ boolean agg_isNull1 = false; /* 081 */ /* 082 */ long agg_value1 = -1L; /* 083 */ agg_value1 = agg_bufValue + 1L; /* 084 */ // update aggregation buffer /* 085 */ agg_bufIsNull = false; /* 086 */ agg_bufValue = agg_value1; /* 087 */ /* 088 */ // shouldStop check is eliminated /* 089 */ } /* 090 */ range_number = range_batchEnd; /* 091 */ } /* 092 */ /* 093 */ if (range_taskContext.isInterrupted()) { /* 094 */ throw new TaskKilledException(); /* 095 */ } /* 096 */ /* 097 */ long range_nextBatchTodo; /* 098 */ if (range_numElementsTodo > 1000L) { /* 099 */ range_nextBatchTodo = 1000L; /* 100 */ range_numElementsTodo -= 1000L; /* 101 */ } else { /* 102 */ range_nextBatchTodo = range_numElementsTodo; /* 103 */ range_numElementsTodo = 0; /* 104 */ if (range_nextBatchTodo == 0) break; /* 105 */ } /* 106 */ range_numOutputRows.add(range_nextBatchTodo); /* 107 */ range_inputMetrics.incRecordsRead(range_nextBatchTodo); /* 108 */ /* 109 */ range_batchEnd += range_nextBatchTodo * 1L; /* 110 */ } /* 111 */ /* 112 */ } /* 113 */ /* 114 */ private void initRange(int idx) { /* 115 */ java.math.BigInteger index = java.math.BigInteger.valueOf(idx); /* 116 */ java.math.BigInteger numSlice = java.math.BigInteger.valueOf(2L); /* 117 */ java.math.BigInteger numElement = java.math.BigInteger.valueOf(10000L); /* 118 */ java.math.BigInteger step = java.math.BigInteger.valueOf(1L); /* 119 */ java.math.BigInteger start = java.math.BigInteger.valueOf(0L); /* 120 */ long partitionEnd; /* 121 */ /* 122 */ java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start); /* 123 */ if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 124 */ range_number = Long.MAX_VALUE; /* 125 */ } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 126 */ range_number = Long.MIN_VALUE; /* 127 */ } else { /* 128 */ range_number = st.longValue(); /* 129 */ } /* 130 */ range_batchEnd = range_number; /* 131 */ /* 132 */ java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice) /* 133 */ .multiply(step).add(start); /* 134 */ if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 135 */ partitionEnd = Long.MAX_VALUE; /* 136 */ } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 137 */ partitionEnd = Long.MIN_VALUE; /* 138 */ } else { /* 139 */ partitionEnd = end.longValue(); /* 140 */ } /* 141 */ /* 142 */ java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract( /* 143 */ java.math.BigInteger.valueOf(range_number)); /* 144 */ range_numElementsTodo = startToEnd.divide(step).longValue(); /* 145 */ if (range_numElementsTodo < 0) { /* 146 */ range_numElementsTodo = 0; /* 147 */ } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) { /* 148 */ range_numElementsTodo++; /* 149 */ } /* 150 */ } /* 151 */ /* 152 */ protected void processNext() throws java.io.IOException { /* 153 */ while (!agg_initAgg) { /* 154 */ agg_initAgg = true; /* 155 */ long agg_beforeAgg = System.nanoTime(); /* 156 */ agg_doAggregateWithoutKey(); /* 157 */ agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000); /* 158 */ /* 159 */ // output the result /* 160 */ /* 161 */ agg_numOutputRows.add(1); /* 162 */ agg_rowWriter.zeroOutNullBytes(); /* 163 */ /* 164 */ if (agg_bufIsNull) { /* 165 */ agg_rowWriter.setNullAt(0); /* 166 */ } else { /* 167 */ agg_rowWriter.write(0, agg_bufValue); /* 168 */ } /* 169 */ append(agg_result); /* 170 */ } /* 171 */ } /* 172 */ } ``` A part of suppressing `shouldStop()` was originally developed by inouehrs ## How was this patch tested? Add new tests into `DataFrameRangeSuite` Author: Kazuaki Ishizaki <[email protected]> Closes apache#17122 from kiszk/SPARK-19786.
…cal plan
## What changes were proposed in this pull request?
When adaptive execution is enabled, an exchange coordinator is used in the Exchange operators. For Join, the same exchange coordinator is used for its two Exchanges. But the physical plan shows two different coordinator Ids which is confusing.
This PR is to fix the incorrect exchange coordinator id in the physical plan. The coordinator object instead of the `Option[ExchangeCoordinator]` should be used to generate the identity hash code of the same coordinator.
## How was this patch tested?
Before the patch, the physical plan shows two different exchange coordinator id for Join.
```
== Physical Plan ==
*Project [key1#3L, value2#12L]
+- *SortMergeJoin [key1#3L], [key2#11L], Inner
:- *Sort [key1#3L ASC NULLS FIRST], false, 0
: +- Exchange(coordinator id: 1804587700) hashpartitioning(key1#3L, 10), coordinator[target post-shuffle partition size: 67108864]
: +- *Project [(id#0L % 500) AS key1#3L]
: +- *Filter isnotnull((id#0L % 500))
: +- *Range (0, 1000, step=1, splits=Some(10))
+- *Sort [key2#11L ASC NULLS FIRST], false, 0
+- Exchange(coordinator id: 793927319) hashpartitioning(key2#11L, 10), coordinator[target post-shuffle partition size: 67108864]
+- *Project [(id#8L % 500) AS key2#11L, id#8L AS value2#12L]
+- *Filter isnotnull((id#8L % 500))
+- *Range (0, 1000, step=1, splits=Some(10))
```
After the patch, two exchange coordinator id are the same.
Author: Carson Wang <[email protected]>
Closes apache#16952 from carsonwang/FixCoordinatorId.
…SPARK_JAVA_OPTS This fix removes deprecated support for config `SPARK_YARN_USER_ENV`, as is mentioned in SPARK-17979. This fix also removes deprecated support for the following: ``` SPARK_YARN_USER_ENV SPARK_JAVA_OPTS SPARK_CLASSPATH SPARK_WORKER_INSTANCES ``` Related JIRA: [SPARK-14453]: https://issues.apache.org/jira/browse/SPARK-14453 [SPARK-12344]: https://issues.apache.org/jira/browse/SPARK-12344 [SPARK-15781]: https://issues.apache.org/jira/browse/SPARK-15781 Existing tests should pass. Author: Yong Tang <[email protected]> Closes apache#17212 from yongtang/SPARK-17979.
…ed schema ## What changes were proposed in this pull request? The ```HiveMetastoreCatalog.mergeWithMetastoreSchema()``` method added in apache#16944 may not preserve the same field order as the metastore schema in some cases, which can cause queries to fail. This change ensures that the metastore field order is preserved. ## How was this patch tested? A test for ensuring that metastore order is preserved was added to ```HiveSchemaInferenceSuite.``` The particular failure usecase from apache#16944 was tested manually as well. Author: Budde <[email protected]> Closes apache#17249 from budde/PreserveMetastoreFieldOrder.
## What changes were proposed in this pull request? `Dataset.inputFiles` works by matching `FileRelation`s in the query plan. In Spark 2.1, Hive SerDe tables are represented by `MetastoreRelation`, which inherits from `FileRelation`. However, in Spark 2.2, Hive SerDe tables are now represented by `CatalogRelation`, which doesn't inherit from `FileRelation` anymore, due to the unification of Hive SerDe tables and data source tables. This change breaks `Dataset.inputFiles` for Hive SerDe tables. This PR tries to fix this issue by explicitly matching `CatalogRelation`s that are Hive SerDe tables in `Dataset.inputFiles`. Note that we can't make `CatalogRelation` inherit from `FileRelation` since not all `CatalogRelation`s are file based (e.g., JDBC data source tables). ## How was this patch tested? New test case added in `HiveDDLSuite`. Author: Cheng Lian <[email protected]> Closes apache#17247 from liancheng/spark-19905-hive-table-input-files.
## What changes were proposed in this pull request? In spark SQL, map type can't be used in equality test/comparison, and `Intersect`/`Except`/`Distinct` do need equality test for all columns, we should not allow map type in `Intersect`/`Except`/`Distinct`. ## How was this patch tested? new regression test Author: Wenchen Fan <[email protected]> Closes apache#17236 from cloud-fan/map.
…ion should work ## What changes were proposed in this pull request? This JIRA is a follow up work after [SPARK-19583](https://issues.apache.org/jira/browse/SPARK-19583) As we discussed in that [PR](apache#16938) The following DDL for datasource table with an non-existent location should work: ``` CREATE TABLE ... (PARTITIONED BY ...) LOCATION path ``` Currently it will throw exception that path not exists for datasource table for datasource table ## How was this patch tested? unit test added Author: windpiger <[email protected]> Closes apache#17055 from windpiger/CTDataSourcePathNotExists.
update from origin
## What changes were proposed in this pull request? - SS python example: `TypeError: 'xxx' object is not callable` - some other doc issue. ## How was this patch tested? Jenkins. Author: uncleGen <[email protected]> Closes apache#17257 from uncleGen/docs-ss-python.
… up the directories of finished applications to avoid the block Cleaning the application may cost much time at worker, then it will block that the worker send heartbeats master because the worker is extend ThreadSafeRpcEndpoint. If the heartbeat from a worker is blocked by the message ApplicationFinished, master will think the worker is dead. If the worker has a driver, the driver will be scheduled by master again. It had better reuse the existing cleanupThreadExecutor to clean up the directories of finished applications to avoid the block. Author: xiaojian.fxj <[email protected]> Closes apache#17189 from hustfxj/worker-hearbeat.
… param "maxDepth" to R models ## What changes were proposed in this pull request? RandomForest R Wrapper and GBT R Wrapper return param `maxDepth` to R models. Below 4 R wrappers are changed: * `RandomForestClassificationWrapper` * `RandomForestRegressionWrapper` * `GBTClassificationWrapper` * `GBTRegressionWrapper` ## How was this patch tested? Test manually on my local machine. Author: Xin Ren <[email protected]> Closes apache#17207 from keypointt/SPARK-19282.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.