Skip to content

Commit 40b6dc6

Browse files
move error to StateStoreErrors
1 parent 23639f4 commit 40b6dc6

File tree

3 files changed

+14
-10
lines changed

3 files changed

+14
-10
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package org.apache.spark.sql.execution.datasources.v2.state
1818

19-
import org.apache.spark.SparkException
2019
import org.apache.spark.internal.Logging
2120
import org.apache.spark.sql.catalyst.InternalRow
2221
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
@@ -99,10 +98,8 @@ class StatePartitionReader(
9998

10099
case Some(snapshotStartBatchId) =>
101100
if (!provider.isInstanceOf[SupportsFineGrainedReplayFromSnapshot]) {
102-
throw new SparkException(
103-
errorClass = "STATE_STORE_PROVIDER_DOES_NOT_SUPPORT_FINE_GRAINED_STATE_REPLAY",
104-
messageParameters = Map("inputClass" -> provider.getClass.toString),
105-
cause = null)
101+
StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay(
102+
provider.getClass.toString)
106103
}
107104
provider.asInstanceOf[SupportsFineGrainedReplayFromSnapshot]
108105
.replayReadStateFromSnapshot(

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,11 @@ object StateStoreErrors {
160160
StateStoreSnapshotPartitionNotFound = {
161161
new StateStoreSnapshotPartitionNotFound(snapshotPartitionId, operatorId, checkpointLocation)
162162
}
163+
164+
def stateStoreProviderDoesNotSupportFineGrainedReplay(inputClass: String):
165+
StateStoreProviderDoesNotSupportFineGrainedReplay = {
166+
new StateStoreProviderDoesNotSupportFineGrainedReplay(inputClass)
167+
}
163168
}
164169

165170
class StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider: String)
@@ -291,3 +296,8 @@ class StateStoreValueRowFormatValidationFailure(errorMsg: String)
291296
extends SparkRuntimeException(
292297
errorClass = "STATE_STORE_VALUE_ROW_FORMAT_VALIDATION_FAILURE",
293298
messageParameters = Map("errorMsg" -> errorMsg))
299+
300+
class StateStoreProviderDoesNotSupportFineGrainedReplay(inputClass: String)
301+
extends SparkUnsupportedOperationException(
302+
errorClass = "STATE_STORE_PROVIDER_DOES_NOT_SUPPORT_FINE_GRAINED_STATE_REPLAY",
303+
messageParameters = Map("inputClass" -> inputClass))

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import scala.annotation.tailrec
2323

2424
import org.apache.hadoop.conf.Configuration
2525

26-
import org.apache.spark.SparkException
2726
import org.apache.spark.TaskContext
2827
import org.apache.spark.internal.{Logging, MDC}
2928
import org.apache.spark.internal.LogKeys.{END_INDEX, START_INDEX, STATE_STORE_ID}
@@ -495,10 +494,8 @@ class SymmetricHashJoinStateManager(
495494
useMultipleValuesPerKey = false)
496495
if (snapshotStartVersion.isDefined) {
497496
if (!stateStoreProvider.isInstanceOf[SupportsFineGrainedReplayFromSnapshot]) {
498-
throw new SparkException(
499-
errorClass = "STATE_STORE_PROVIDER_DOES_NOT_SUPPORT_FINE_GRAINED_STATE_REPLAY",
500-
messageParameters = Map("inputClass" -> stateStoreProvider.getClass.toString),
501-
cause = null)
497+
StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay(
498+
stateStoreProvider.getClass.toString)
502499
}
503500
stateStoreProvider.asInstanceOf[SupportsFineGrainedReplayFromSnapshot]
504501
.replayStateFromSnapshot(snapshotStartVersion.get, stateInfo.get.storeVersion)

0 commit comments

Comments
 (0)