Skip to content

Commit 5458d30

Browse files
committed
tests pass
1 parent 50a68ca commit 5458d30

File tree

3 files changed

+19
-47
lines changed

3 files changed

+19
-47
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -205,12 +205,22 @@ class IncrementalExecution(
205205
currentBatchId,
206206
stateSchemaVersion)
207207
// write out the state schema paths to the metadata file
208-
val metadata = stateStoreWriter.operatorStateMetadata()
209-
// TODO: Populate metadata with stateSchemaPaths if metadata version is v2
210-
val metadataWriter = new OperatorStateMetadataWriter(new Path(
211-
checkpointLocation, stateStoreWriter.getStateInfo.operatorId.toString), hadoopConf)
212-
metadataWriter.write(metadata)
213-
stateStoreWriter
208+
val metadata = stateStoreWriter.operatorStateMetadata(stateSchemaPaths)
209+
210+
stateStoreWriter match {
211+
case tws: TransformWithStateExec =>
212+
val metadataPath = OperatorStateMetadataV2.metadataFilePath(new Path(
213+
checkpointLocation, tws.getStateInfo.operatorId.toString))
214+
val operatorStateMetadataLog = new OperatorStateMetadataLog(sparkSession,
215+
metadataPath.toString)
216+
operatorStateMetadataLog.add(currentBatchId, metadata)
217+
tws
218+
case _ =>
219+
val metadataWriter = new OperatorStateMetadataWriter(new Path(
220+
checkpointLocation, stateStoreWriter.getStateInfo.operatorId.toString), hadoopConf)
221+
metadataWriter.write(metadata)
222+
stateStoreWriter
223+
}
214224
case statefulOp: StatefulOperator if isFirstBatch =>
215225
statefulOp.
216226
validateAndMaybeEvolveStateSchema(hadoopConf, currentBatchId, stateSchemaVersion = 2)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path
2828

2929
import org.apache.spark.sql.SparkSession
3030
import org.apache.spark.sql.execution.streaming.HDFSMetadataLog
31+
import org.apache.spark.sql.internal.SQLConf
3132

3233
object StateSchemaV3File {
3334
val COLUMN_FAMILY_SCHEMA_VERSION = 1

sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala

Lines changed: 2 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.sql.{Dataset, Encoders, Row}
2828
import org.apache.spark.sql.catalyst.util.stringToFile
2929
import org.apache.spark.sql.execution.streaming._
3030
import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.KEY_ROW_SCHEMA
31-
import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, ColumnFamilySchema, ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, RocksDBStateStoreProvider, StatefulProcessorCannotPerformOperationWithInvalidHandleState, StateSchemaV3File, StateStoreMultipleColumnFamiliesNotSupportedException}
31+
import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, ColumnFamilySchema, ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, OperatorStateMetadataV2, RocksDBStateStoreProvider, StatefulProcessorCannotPerformOperationWithInvalidHandleState, StateSchemaV3File, StateStoreMultipleColumnFamiliesNotSupportedException}
3232
import org.apache.spark.sql.functions.timestamp_seconds
3333
import org.apache.spark.sql.internal.SQLConf
3434
import org.apache.spark.sql.streaming.util.StreamManualClock
@@ -805,7 +805,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
805805
getLatest().get._2.
806806
asInstanceOf[OperatorStateMetadataV2].
807807
stateStoreInfo.head.stateSchemaFilePath
808-
fetchStateSchemaV3File(checkpointDir, operatorId).get(new Path(stateSchemaFilePath))
808+
fetchStateSchemaV3File(checkpointDir, operatorId).getWithPath(new Path(stateSchemaFilePath))
809809
}
810810

811811
private def fetchStateSchemaV3File(
@@ -900,45 +900,6 @@ class TransformWithStateSuite extends StateStoreMetricsTest
900900
}
901901
}
902902

903-
test("transformWithState - verify that StateSchemaV3 files are purged") {
904-
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
905-
classOf[RocksDBStateStoreProvider].getName,
906-
SQLConf.SHUFFLE_PARTITIONS.key ->
907-
TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
908-
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
909-
withTempDir { chkptDir =>
910-
val inputData = MemoryStream[String]
911-
val result = inputData.toDS()
912-
.groupByKey(x => x)
913-
.transformWithState(new RunningCountStatefulProcessor(),
914-
TimeMode.None(),
915-
OutputMode.Update())
916-
917-
testStream(result, OutputMode.Update())(
918-
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
919-
AddData(inputData, "a"),
920-
CheckNewAnswer(("a", "1")),
921-
StopStream,
922-
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
923-
AddData(inputData, "a"),
924-
CheckNewAnswer(("a", "2")),
925-
StopStream,
926-
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
927-
AddData(inputData, "a"),
928-
CheckNewAnswer(),
929-
StopStream
930-
)
931-
// If the StateSchemaV3 files are not purged, there would be
932-
// three files, but we should have only one file.
933-
val batchesWithSchemaV3File = fetchStateSchemaV3File(
934-
chkptDir.getCanonicalPath, 0).listBatchesOnDisk
935-
assert(batchesWithSchemaV3File.length == 1)
936-
// Make sure that only the latest batch has the schema file
937-
assert(batchesWithSchemaV3File.head == 2)
938-
}
939-
}
940-
}
941-
942903
test("transformWithState - verify that OperatorStateMetadataV2" +
943904
" file is being written correctly") {
944905
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->

0 commit comments

Comments
 (0)