Skip to content

Commit c8d06d0

Browse files
committed
renaming to suffix key
1 parent db2f304 commit c8d06d0

File tree

3 files changed

+13
-13
lines changed

3 files changed

+13
-13
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class StateStoreColumnFamilySchemaUtils(initializeAvroSerde: Boolean) {
6262
private def getAvroSerde(
6363
keySchema: StructType,
6464
valSchema: StructType,
65-
userKeySchema: Option[StructType] = None
65+
suffixKeySchema: Option[StructType] = None
6666
): Option[AvroEncoderSpec] = {
6767
if (initializeAvroSerde) {
6868
val avroType = SchemaConverters.toAvroType(valSchema)
@@ -76,18 +76,18 @@ class StateStoreColumnFamilySchemaUtils(initializeAvroSerde: Boolean) {
7676
val valueDeserializer = new AvroDeserializer(avroType, valSchema,
7777
avroOptions.datetimeRebaseModeInRead, avroOptions.useStableIdForUnionType,
7878
avroOptions.stableIdPrefixForUnionType, avroOptions.recursiveFieldMaxDepth)
79-
val (userKeySerializer, userKeyDeserializer) = if (userKeySchema.isDefined) {
80-
val userKeyAvroType = SchemaConverters.toAvroType(userKeySchema.get)
81-
val ukSer = new AvroSerializer(userKeySchema.get, userKeyAvroType, nullable = false)
82-
val ukDe = new AvroDeserializer(userKeyAvroType, userKeySchema.get,
79+
val (suffixKeySer, suffixKeyDe) = if (suffixKeySchema.isDefined) {
80+
val userKeyAvroType = SchemaConverters.toAvroType(suffixKeySchema.get)
81+
val skSer = new AvroSerializer(suffixKeySchema.get, userKeyAvroType, nullable = false)
82+
val skDe = new AvroDeserializer(userKeyAvroType, suffixKeySchema.get,
8383
avroOptions.datetimeRebaseModeInRead, avroOptions.useStableIdForUnionType,
8484
avroOptions.stableIdPrefixForUnionType, avroOptions.recursiveFieldMaxDepth)
85-
(Some(ukSer), Some(ukDe))
85+
(Some(skSer), Some(skDe))
8686
} else {
8787
(None, None)
8888
}
8989
Some(AvroEncoderSpec(
90-
keySer, keyDe, valueSerializer, valueDeserializer, userKeySerializer, userKeyDeserializer))
90+
keySer, keyDe, valueSerializer, valueDeserializer, suffixKeySer, suffixKeyDe))
9191
} else {
9292
None
9393
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ class PrefixKeyScanStateEncoder(
287287
),
288288
encodeUnsafeRow(
289289
remainingKeyProjection(row),
290-
avroEnc.get.userKeySerializer.get,
290+
avroEnc.get.suffixKeySerializer.get,
291291
remainingKeyAvroType,
292292
out
293293
)
@@ -335,7 +335,7 @@ class PrefixKeyScanStateEncoder(
335335
),
336336
decodeToUnsafeRow(
337337
remainingKeyEncoded,
338-
avroEnc.get.userKeyDeserializer.get,
338+
avroEnc.get.suffixKeyDeserializer.get,
339339
remainingKeyAvroType,
340340
remainingKeyProj
341341
)
@@ -703,7 +703,7 @@ class RangeKeyScanStateEncoder(
703703
val remainingEncoded = if (avroEnc.isDefined) {
704704
encodeUnsafeRow(
705705
remainingKeyProjection(row),
706-
avroEnc.get.userKeySerializer.get,
706+
avroEnc.get.suffixKeySerializer.get,
707707
remainingKeyAvroType,
708708
out
709709
)
@@ -767,7 +767,7 @@ class RangeKeyScanStateEncoder(
767767

768768
val remainingKeyDecoded = if (avroEnc.isDefined) {
769769
decodeToUnsafeRow(remainingKeyEncoded,
770-
avroEnc.get.userKeyDeserializer.get,
770+
avroEnc.get.suffixKeyDeserializer.get,
771771
remainingKeyAvroType, remainingKeyAvroProjection)
772772
} else {
773773
decodeToUnsafeRow(remainingKeyEncoded,

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ case class AvroEncoderSpec(
4545
keyDeserializer: AvroDeserializer,
4646
valueSerializer: AvroSerializer,
4747
valueDeserializer: AvroDeserializer,
48-
userKeySerializer: Option[AvroSerializer] = None,
49-
userKeyDeserializer: Option[AvroDeserializer] = None
48+
suffixKeySerializer: Option[AvroSerializer] = None,
49+
suffixKeyDeserializer: Option[AvroDeserializer] = None
5050
) extends Serializable
5151

5252
// Used to represent the schema of a column family in the state store

0 commit comments

Comments
 (0)