@@ -63,6 +63,32 @@ class RunningCountStatefulProcessor extends StatefulProcessor[String, String, (S
6363 }
6464}
6565
66+ class RunningCountStatefulProcessorInt extends StatefulProcessor [String , String , (String , String )]
67+ with Logging {
68+ @ transient protected var _countState : ValueState [Int ] = _
69+
70+ override def init (
71+ outputMode : OutputMode ,
72+ timeMode : TimeMode ): Unit = {
73+ _countState = getHandle.getValueState[Int ](" countState" , Encoders .scalaInt)
74+ }
75+
76+ override def handleInputRows (
77+ key : String ,
78+ inputRows : Iterator [String ],
79+ timerValues : TimerValues ,
80+ expiredTimerInfo : ExpiredTimerInfo ): Iterator [(String , String )] = {
81+ val count = _countState.getOption().getOrElse(0 ) + 1
82+ if (count == 3 ) {
83+ _countState.clear()
84+ Iterator .empty
85+ } else {
86+ _countState.update(count)
87+ Iterator ((key, count.toString))
88+ }
89+ }
90+ }
91+
6692// Class to verify stateful processor usage with adding processing time timers
6793class RunningCountStatefulProcessorWithProcTimeTimer extends RunningCountStatefulProcessor {
6894 private def handleProcessingTimeBasedTimers (
@@ -1036,6 +1062,42 @@ class TransformWithStateSuite extends StateStoreMetricsTest
10361062 }
10371063 }
10381064 }
1065+
1066+ // TODO: Enable this test and expect error to be thrown when
1067+ // github.com/apache/spark/pull/47257 is merged
1068+ ignore(" test that invalid schema evolution fails query for column family" ) {
1069+ withSQLConf(SQLConf .STATE_STORE_PROVIDER_CLASS .key ->
1070+ classOf [RocksDBStateStoreProvider ].getName,
1071+ SQLConf .SHUFFLE_PARTITIONS .key ->
1072+ TransformWithStateSuiteUtils .NUM_SHUFFLE_PARTITIONS .toString) {
1073+ withTempDir { checkpointDir =>
1074+ val inputData = MemoryStream [String ]
1075+ val result1 = inputData.toDS()
1076+ .groupByKey(x => x)
1077+ .transformWithState(new RunningCountStatefulProcessor (),
1078+ TimeMode .None (),
1079+ OutputMode .Update ())
1080+
1081+ testStream(result1, OutputMode .Update ())(
1082+ StartStream (checkpointLocation = checkpointDir.getCanonicalPath),
1083+ AddData (inputData, " a" ),
1084+ CheckNewAnswer ((" a" , " 1" )),
1085+ StopStream
1086+ )
1087+ val result2 = inputData.toDS()
1088+ .groupByKey(x => x)
1089+ .transformWithState(new RunningCountStatefulProcessorInt (),
1090+ TimeMode .None (),
1091+ OutputMode .Update ())
1092+ testStream(result2, OutputMode .Update ())(
1093+ StartStream (checkpointLocation = checkpointDir.getCanonicalPath),
1094+ AddData (inputData, " a" ),
1095+ CheckNewAnswer ((" a" , " 2" )),
1096+ StopStream
1097+ )
1098+ }
1099+ }
1100+ }
10391101}
10401102
10411103class TransformWithStateValidationSuite extends StateStoreMetricsTest {
0 commit comments