Skip to content

Conversation

@ericm-db
Copy link
Contributor

@ericm-db ericm-db commented Nov 23, 2024

What changes were proposed in this pull request?

Currently, we use the internal byte representation to store state for stateful streaming operators in the StateStore. This PR introduces Avro serialization and deserialization capabilities in the RocksDBStateEncoder so that we can instead use Avro encoding to store state. This is currently enabled for the TransformWithState operator via SQLConf to support all functionality supported by TWS

Why are the changes needed?

UnsafeRow is an inherently unstable format that makes no guarantees of being backwards-compatible. Therefore, if the format changes between Spark releases, this could cause StateStore corruptions. Avro is more stable, and inherently enables schema evolution.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Amended and added to unit tests

Was this patch authored or co-authored using generative AI tooling?

No

@ericm-db ericm-db changed the title [WIP] init [SPARK-50017] Support Avro encoding for TransformWithState operator Nov 23, 2024
@HyukjinKwon HyukjinKwon changed the title [SPARK-50017] Support Avro encoding for TransformWithState operator [SPARK-50017][SQL] Support Avro encoding for TransformWithState operator Nov 24, 2024
Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looking good! Left some comments, we're missing some clean up that can be done

Comment on lines 64 to 67
def encodeKey(row: UnsafeRow): Array[Byte]
def encodeRemainingKey(row: UnsafeRow): Array[Byte]
def encodePrefixKeyForRangeScan(row: UnsafeRow): Array[Byte]
def encodeValue(row: UnsafeRow): Array[Byte]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add docs for these actually? What additional information would be needed for these?

For example, when would encodeRemainingKey be used? What schema information needs to be passed in for this?

Comment on lines 69 to 72
def decodeKey(bytes: Array[Byte]): UnsafeRow
def decodeRemainingKey(bytes: Array[Byte]): UnsafeRow
def decodePrefixKeyForRangeScan(bytes: Array[Byte]): UnsafeRow
def decodeValue(bytes: Array[Byte]): UnsafeRow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Comment on lines 86 to 88
(RocksDBStateEncoder.getKeyEncoder(dataEncoder, keyStateEncoderSpec, useColumnFamilies,
Some(newColFamilyId)), RocksDBStateEncoder.getValueEncoder(dataEncoder, valueSchema,
useMultipleValuesPerKey)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you make these one parameter per line please? maybe define them outside of the putIfAbsent first

Comment on lines 396 to 398
(RocksDBStateEncoder.getKeyEncoder(dataEncoder, keyStateEncoderSpec,
useColumnFamilies, defaultColFamilyId),
RocksDBStateEncoder.getValueEncoder(dataEncoder, valueSchema, useMultipleValuesPerKey)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Comment on lines 646 to 663
stateStoreEncoding match {
case "avro" => Some(
RocksDBStateStoreProvider.avroEncoderMap.get(
avroEncCacheKey,
new java.util.concurrent.Callable[AvroEncoder] {
override def call(): AvroEncoder = createAvroEnc(keyStateEncoderSpec, valueSchema)
case "avro" =>
RocksDBStateStoreProvider.dataEncoderCache.get(
encoderCacheKey,
new java.util.concurrent.Callable[AvroStateEncoder] {
override def call(): AvroStateEncoder = {
val avroEncoder = createAvroEnc(keyStateEncoderSpec, valueSchema)
new AvroStateEncoder(keyStateEncoderSpec, valueSchema, avroEncoder)
}
}
)
case "unsaferow" =>
RocksDBStateStoreProvider.dataEncoderCache.get(
encoderCacheKey,
new java.util.concurrent.Callable[UnsafeRowDataEncoder] {
override def call(): UnsafeRowDataEncoder = {
new UnsafeRowDataEncoder(keyStateEncoderSpec, valueSchema)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe change the order of the match-case with the cache.get?

assert(Set("avro", "unsaferow").contains(stateStoreEncoding))

RocksDBStateStoreProvider.dataEncoderCache.get(
          encoderCacheKey,
          new java.util.concurrent.Callable[DataEncoder] { () =>
       if (stateStoreEncoding == "avro") {
         ...
       } else {
          ...
       }
    }

dataEncoder: RocksDBDataEncoder,
valueSchema: StructType,
useMultipleValuesPerKey: Boolean,
avroEnc: Option[AvroEncoder] = None): RocksDBValueStateEncoder = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
}

object RocksDBStateEncoder extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add scaladoc for all the methods below?

Comment on lines 800 to 816
keyStateEncoderSpec match {
case NoPrefixKeyStateEncoderSpec(keySchema) =>
new NoPrefixKeyStateEncoder(dataEncoder, keySchema, useColumnFamilies, virtualColFamilyId)

case PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey) =>
new PrefixKeyScanStateEncoder(dataEncoder, keySchema, numColsPrefixKey,
useColumnFamilies, virtualColFamilyId)

case RangeKeyScanStateEncoderSpec(keySchema, orderingOrdinals) =>
new RangeKeyScanStateEncoder(dataEncoder, keySchema, orderingOrdinals,
useColumnFamilies, virtualColFamilyId)

case _ =>
throw new IllegalArgumentException(s"Unsupported key state encoder spec: " +
s"$keyStateEncoderSpec")
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the encoderSpec's expose a method instead:

def toEncoder(dataEncoder: DataEncoder, useColumnFamilies, virtualColFamilyId): RocksDBKeyStateEncoderBase

Comment on lines 874 to 884
// Prefix Key schema and projection definitions used by the Avro Serializers
// and Deserializers
private val prefixKeySchema = StructType(keySchema.take(numColsPrefixKey))
private lazy val prefixKeyAvroType = SchemaConverters.toAvroType(prefixKeySchema)
private val prefixKeyProj = UnsafeProjection.create(prefixKeySchema)

// Remaining Key schema and projection definitions used by the Avro Serializers
// and Deserializers
private val remainingKeySchema = StructType(keySchema.drop(numColsPrefixKey))
private lazy val remainingKeyAvroType = SchemaConverters.toAvroType(remainingKeySchema)
private val remainingKeyProj = UnsafeProjection.create(remainingKeySchema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some of these need to be cleaned up. They're unused and no longer needed

Comment on lines 1059 to 1060
private val rangeScanAvroSchema = StateStoreColumnFamilySchemaUtils.convertForRangeScan(
StructType(rangeScanKeyFieldsWithOrdinal.map(_._1).toArray))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are also no longer used. Please clean them up

@ericm-db ericm-db requested a review from brkyvz December 13, 2024 21:54
Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically comment on scaladoc style, otherwise LGTM!

Comment on lines 63 to 67
/** Interface for encoding and decoding state store data between UnsafeRow and raw bytes.
*
* @note All encode methods expect non-null input rows. Handling of null values is left to the
* implementing classes.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this in the right place?

*/
def encodeKey(row: UnsafeRow): Array[Byte]

/** Encodes the non-prefix portion of a key row. Used with prefix scan and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Text should start at the second line, not the first in scaladoc

Comment on lines 513 to 515
keyStateEncoderSpec: KeyStateEncoderSpec,
valueSchema: StructType
): AvroEncoder = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please fix the indent

Comment on lines 481 to 482


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uber nit: delete extra empty lines

}
}

/** Factory object for creating state encoders used by RocksDB state store.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move text to next line

def jsonValue: JValue
def json: String = compact(render(jsonValue))

/** Creates a RocksDBKeyStateEncoder for this specification.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto on scaladoc

if (numColsPrefixKey == 0 || numColsPrefixKey >= keySchema.length) {
throw StateStoreErrors.incorrectNumOrderingColsForPrefixScan(numColsPrefixKey.toString)
}
override def toEncoder(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: new line please

@ericm-db ericm-db requested a review from brkyvz December 17, 2024 04:10
case PrefixKeyScanStateEncoderSpec(_, _) =>
encodeUnsafeRowToAvro(row, avroEncoder.suffixKeySerializer.get, remainingKeyAvroType, out)
case RangeKeyScanStateEncoderSpec(_, _) =>
encodeUnsafeRowToAvro(row, avroEncoder.keySerializer, remainingKeyAvroType, out)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be suffixKeySerializer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is right - RangeKeyScan encoder doesn't use Avro serialization for both parts of the ke

@ericm-db ericm-db requested a review from brkyvz December 17, 2024 05:19
@ericm-db ericm-db changed the title [SPARK-50017][SQL] Support Avro encoding for TransformWithState operator [SPARK-50599][SQL] Support Avro encoding for TransformWithState operator Dec 17, 2024
@ericm-db ericm-db changed the title [SPARK-50599][SQL] Support Avro encoding for TransformWithState operator [SPARK-50599][SQL] Create the DataEncoder trait that allows for Avro and UnsafeRow encoding Dec 17, 2024
Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but you missed one comment: https://github.com/apache/spark/pull/48944/files#r1887807431 on scaladoc. Once done @HeartSaVioR , can you please help merge this?

@HeartSaVioR
Copy link
Contributor

The comment link does not show the comment, which I assume the comment is outdated via new commit.
I'm merging on behalf of @brkyvz . Thanks! Merging to master.

jabbaugh pushed a commit to jabbaugh/spark that referenced this pull request Dec 18, 2024
…and UnsafeRow encoding

### What changes were proposed in this pull request?

Currently, we use the internal byte representation to store state for stateful streaming operators in the StateStore. This PR introduces Avro serialization and deserialization capabilities in the RocksDBStateEncoder so that we can instead use Avro encoding to store state. This is currently enabled for the TransformWithState operator via SQLConf to support all functionality supported by TWS

### Why are the changes needed?

UnsafeRow is an inherently unstable format that makes no guarantees of being backwards-compatible. Therefore, if the format changes between Spark releases, this could cause StateStore corruptions. Avro is more stable, and inherently enables schema evolution.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Amended and added to unit tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#48944 from ericm-db/avro-ss.

Lead-authored-by: Eric Marnadi <[email protected]>
Co-authored-by: Eric Marnadi <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
storeConf: StateStoreConf,
hadoopConf: Configuration,
useMultipleValuesPerKey: Boolean = false): StateStore = {
hadoopConf.set(StreamExecution.RUN_ID_KEY, storeProviderId.queryRunId.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random late comment I noticed in the code, this is already called 4 line below

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants