Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,37 @@ object UnsupportedOperationChecker {
subplan.collect { case a: Aggregate if a.isStreaming => a }
}

val mapGroupsWithStates = plan.collect {
case f: FlatMapGroupsWithState if f.isStreaming && f.isMapGroupsWithState => f
}

// Disallow multiple `mapGroupsWithState`s.
if (mapGroupsWithStates.size >= 2) {
throwError(
"Multiple mapGroupsWithStates are not supported on a streaming DataFrames/Datasets")(plan)
}

val flatMapGroupsWithStates = plan.collect {
case f: FlatMapGroupsWithState if f.isStreaming && !f.isMapGroupsWithState => f
}

// Disallow mixing `mapGroupsWithState`s and `flatMapGroupsWithState`s
if (mapGroupsWithStates.nonEmpty && flatMapGroupsWithStates.nonEmpty) {
throwError(
"Mixing mapGroupsWithStates and flatMapGroupsWithStates are not supported on a " +
"streaming DataFrames/Datasets")(plan)
}

// Only allow multiple `FlatMapGroupsWithState(Append)`s in append mode.
if (flatMapGroupsWithStates.size >= 2 && (
outputMode != InternalOutputModes.Append ||
flatMapGroupsWithStates.exists(_.outputMode != InternalOutputModes.Append)
)) {
throwError(
"Multiple flatMapGroupsWithStates are not supported when they are not all in append mode" +
" or the output mode is not append on a streaming DataFrames/Datasets")(plan)
}

// Disallow multiple streaming aggregations
val aggregates = collectStreamingAggregates(plan)

Expand Down Expand Up @@ -116,9 +147,49 @@ object UnsupportedOperationChecker {
throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
"streaming DataFrames/Datasets")

case m: MapGroupsWithState if collectStreamingAggregates(m).nonEmpty =>
throwError("(map/flatMap)GroupsWithState is not supported after aggregation on a " +
"streaming DataFrame/Dataset")
// mapGroupsWithState: Allowed only when no aggregation + Update output mode
case m: FlatMapGroupsWithState if m.isStreaming && m.isMapGroupsWithState =>
if (collectStreamingAggregates(plan).isEmpty) {
if (outputMode != InternalOutputModes.Update) {
throwError("mapGroupsWithState is not supported with " +
s"$outputMode output mode on a streaming DataFrame/Dataset")
} else {
// Allowed when no aggregation + Update output mode
}
} else {
throwError("mapGroupsWithState is not supported with aggregation " +
"on a streaming DataFrame/Dataset")
}

// flatMapGroupsWithState without aggregation
case m: FlatMapGroupsWithState
if m.isStreaming && collectStreamingAggregates(plan).isEmpty =>
m.outputMode match {
case InternalOutputModes.Update =>
if (outputMode != InternalOutputModes.Update) {
throwError("flatMapGroupsWithState in update mode is not supported with " +
s"$outputMode output mode on a streaming DataFrame/Dataset")
}
case InternalOutputModes.Append =>
if (outputMode != InternalOutputModes.Append) {
throwError("flatMapGroupsWithState in append mode is not supported with " +
s"$outputMode output mode on a streaming DataFrame/Dataset")
}
}

// flatMapGroupsWithState(Update) with aggregation
case m: FlatMapGroupsWithState
if m.isStreaming && m.outputMode == InternalOutputModes.Update
&& collectStreamingAggregates(plan).nonEmpty =>
throwError("flatMapGroupsWithState in update mode is not supported with " +
"aggregation on a streaming DataFrame/Dataset")

// flatMapGroupsWithState(Append) with aggregation
case m: FlatMapGroupsWithState
if m.isStreaming && m.outputMode == InternalOutputModes.Append
&& collectStreamingAggregates(m).nonEmpty =>
throwError("flatMapGroupsWithState in append mode is not supported after " +
s"aggregation on a streaming DataFrame/Dataset")

case d: Deduplicate if collectStreamingAggregates(d).nonEmpty =>
throwError("dropDuplicates is not supported after aggregation on a " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects.Invoke
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._

object CatalystSerde {
Expand Down Expand Up @@ -317,13 +318,15 @@ case class MapGroups(
trait LogicalKeyedState[S]

/** Factory for constructing new `MapGroupsWithState` nodes. */
object MapGroupsWithState {
object FlatMapGroupsWithState {
def apply[K: Encoder, V: Encoder, S: Encoder, U: Encoder](
func: (Any, Iterator[Any], LogicalKeyedState[Any]) => Iterator[Any],
groupingAttributes: Seq[Attribute],
dataAttributes: Seq[Attribute],
outputMode: OutputMode,
isMapGroupsWithState: Boolean,
child: LogicalPlan): LogicalPlan = {
val mapped = new MapGroupsWithState(
val mapped = new FlatMapGroupsWithState(
func,
UnresolvedDeserializer(encoderFor[K].deserializer, groupingAttributes),
UnresolvedDeserializer(encoderFor[V].deserializer, dataAttributes),
Expand All @@ -332,7 +335,9 @@ object MapGroupsWithState {
CatalystSerde.generateObjAttr[U],
encoderFor[S].resolveAndBind().deserializer,
encoderFor[S].namedExpressions,
child)
outputMode,
child,
isMapGroupsWithState)
CatalystSerde.serialize[U](mapped)
}
}
Expand All @@ -350,8 +355,10 @@ object MapGroupsWithState {
* @param outputObjAttr used to define the output object
* @param stateDeserializer used to deserialize state before calling `func`
* @param stateSerializer used to serialize updated state after calling `func`
* @param outputMode the output mode of `func`
* @param isMapGroupsWithState whether it is created by the `mapGroupsWithState` method
*/
case class MapGroupsWithState(
case class FlatMapGroupsWithState(
func: (Any, Iterator[Any], LogicalKeyedState[Any]) => Iterator[Any],
keyDeserializer: Expression,
valueDeserializer: Expression,
Expand All @@ -360,7 +367,14 @@ case class MapGroupsWithState(
outputObjAttr: Attribute,
stateDeserializer: Expression,
stateSerializer: Seq[NamedExpression],
child: LogicalPlan) extends UnaryNode with ObjectProducer
outputMode: OutputMode,
child: LogicalPlan,
isMapGroupsWithState: Boolean = false) extends UnaryNode with ObjectProducer {

if (isMapGroupsWithState) {
assert(outputMode == OutputMode.Update)
}
}

/** Factory for constructing new `FlatMapGroupsInR` nodes. */
object FlatMapGroupsInR {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,19 @@ private[sql] object InternalOutputModes {
* aggregations, it will be equivalent to `Append` mode.
*/
case object Update extends OutputMode


def apply(outputMode: String): OutputMode = {
outputMode.toLowerCase match {
case "append" =>
OutputMode.Append
case "complete" =>
OutputMode.Complete
case "update" =>
OutputMode.Update
case _ =>
throw new IllegalArgumentException(s"Unknown output mode $outputMode. " +
"Accepted output modes are 'append', 'complete', 'update'")
}
}
}
Loading