-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-19690][SS] Join a streaming DataFrame with a batch DataFrame which has an aggregation may not work #17052
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #73407 has started for PR 17052 at commit |
Test build #73403 has finished for PR 17052 at commit
|
Test build #73412 has finished for PR 17052 at commit
|
cc @zsxwing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stateful
indicates if the aggregate is base on streaming or batch, resolved by ResolveStatefulAggregate
rule
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolve one aggregate, determine statefule or not.
Thanks for doing this. I'm wondering if you can fix |
@zsxwing got it |
9c15fcb
to
38e3a14
Compare
retest this please. |
Test build #73507 has finished for PR 17052 at commit
|
working on unit test failure |
c87651a
to
59f4272
Compare
retest this please. |
Test build #73559 has finished for PR 17052 at commit
|
Test build #73558 has finished for PR 17052 at commit
|
Test build #73571 has started for PR 17052 at commit |
SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1" | ||
SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1", | ||
SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED.key -> "false" | ||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Close the "UNSUPPORTED_OPERATION_CHECK_ENABLED", as Source.getBatch
returns DF whose isStreaming
is true.
} else { | ||
LocalRelation(projectList.map(_.toAttribute), data.map(projection)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a streaming query, we will transfrom stream source to a batch LocalRelation
whose isStreaming
is true, so we should keep new LocalRelation's isStreaming
is true in this rule.
case agg @ PhysicalAggregation( | ||
namedGroupingExpressions, aggregateExpressions, rewrittenResultExpressions, child) | ||
if agg.isStreaming => | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apply this strategy only if the logical plan is streaming.
retest this please. |
Test build #73576 has finished for PR 17052 at commit
|
\cc @zsxwing |
ping @zsxwing |
|
||
private var _analyzed: Boolean = false | ||
|
||
private var _incremental: Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding it here will break sameResult
, equals
and other methods. Could you add a new parameter to the constructor of LogicalRelation
and LogicalRDD
instead?
c87651a
to
67847e5
Compare
case localRelation @ LocalRelation(_, _, false) => | ||
localRelation.dataFromStreaming = true | ||
localRelation | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a new parameter dataFromStreaming
to the constructor of LogicalRelation, LogicalRDD and LocalRelation. dataFromStreaming
indicate if this relation comes from a streaming source. In a streaming query, stream relation will be cut into a series of batch relations.
Test build #74099 has finished for PR 17052 at commit
|
Test build #74100 has finished for PR 17052 at commit
|
Test build #74109 has finished for PR 17052 at commit
|
Test build #74206 has finished for PR 17052 at commit
|
Test build #74844 has finished for PR 17052 at commit
|
retest this please. |
Test build #74852 has finished for PR 17052 at commit
|
retest this please. |
Test build #74968 has finished for PR 17052 at commit
|
This is marked as "Critical" for 2.1.1, but I'm not clear it's a regression or that urgent? |
@uncleGen is this still active? |
@HyukjinKwon Sorry! Busy for this period of time. Let me resolve this conflict. |
Yea, I just wanted to check if it is in progress in any way. Thanks for your input. |
Test build #78895 has finished for PR 17052 at commit
|
What changes were proposed in this pull request?
StatefulAggregationStrategy
should check logicplan is streaming or notTest code:
before pr:
after pr:
How was this patch tested?
add new unit test.