-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-50599][SQL] Create the DataEncoder trait that allows for Avro and UnsafeRow encoding #48944
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looking good! Left some comments, we're missing some clean up that can be done
| def encodeKey(row: UnsafeRow): Array[Byte] | ||
| def encodeRemainingKey(row: UnsafeRow): Array[Byte] | ||
| def encodePrefixKeyForRangeScan(row: UnsafeRow): Array[Byte] | ||
| def encodeValue(row: UnsafeRow): Array[Byte] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add docs for these actually? What additional information would be needed for these?
For example, when would encodeRemainingKey be used? What schema information needs to be passed in for this?
| def decodeKey(bytes: Array[Byte]): UnsafeRow | ||
| def decodeRemainingKey(bytes: Array[Byte]): UnsafeRow | ||
| def decodePrefixKeyForRangeScan(bytes: Array[Byte]): UnsafeRow | ||
| def decodeValue(bytes: Array[Byte]): UnsafeRow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| (RocksDBStateEncoder.getKeyEncoder(dataEncoder, keyStateEncoderSpec, useColumnFamilies, | ||
| Some(newColFamilyId)), RocksDBStateEncoder.getValueEncoder(dataEncoder, valueSchema, | ||
| useMultipleValuesPerKey))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can you make these one parameter per line please? maybe define them outside of the putIfAbsent first
| (RocksDBStateEncoder.getKeyEncoder(dataEncoder, keyStateEncoderSpec, | ||
| useColumnFamilies, defaultColFamilyId), | ||
| RocksDBStateEncoder.getValueEncoder(dataEncoder, valueSchema, useMultipleValuesPerKey))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| stateStoreEncoding match { | ||
| case "avro" => Some( | ||
| RocksDBStateStoreProvider.avroEncoderMap.get( | ||
| avroEncCacheKey, | ||
| new java.util.concurrent.Callable[AvroEncoder] { | ||
| override def call(): AvroEncoder = createAvroEnc(keyStateEncoderSpec, valueSchema) | ||
| case "avro" => | ||
| RocksDBStateStoreProvider.dataEncoderCache.get( | ||
| encoderCacheKey, | ||
| new java.util.concurrent.Callable[AvroStateEncoder] { | ||
| override def call(): AvroStateEncoder = { | ||
| val avroEncoder = createAvroEnc(keyStateEncoderSpec, valueSchema) | ||
| new AvroStateEncoder(keyStateEncoderSpec, valueSchema, avroEncoder) | ||
| } | ||
| } | ||
| ) | ||
| case "unsaferow" => | ||
| RocksDBStateStoreProvider.dataEncoderCache.get( | ||
| encoderCacheKey, | ||
| new java.util.concurrent.Callable[UnsafeRowDataEncoder] { | ||
| override def call(): UnsafeRowDataEncoder = { | ||
| new UnsafeRowDataEncoder(keyStateEncoderSpec, valueSchema) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe change the order of the match-case with the cache.get?
assert(Set("avro", "unsaferow").contains(stateStoreEncoding))
RocksDBStateStoreProvider.dataEncoderCache.get(
encoderCacheKey,
new java.util.concurrent.Callable[DataEncoder] { () =>
if (stateStoreEncoding == "avro") {
...
} else {
...
}
}| dataEncoder: RocksDBDataEncoder, | ||
| valueSchema: StructType, | ||
| useMultipleValuesPerKey: Boolean, | ||
| avroEnc: Option[AvroEncoder] = None): RocksDBValueStateEncoder = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| } | ||
| } | ||
|
|
||
| object RocksDBStateEncoder extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add scaladoc for all the methods below?
| keyStateEncoderSpec match { | ||
| case NoPrefixKeyStateEncoderSpec(keySchema) => | ||
| new NoPrefixKeyStateEncoder(dataEncoder, keySchema, useColumnFamilies, virtualColFamilyId) | ||
|
|
||
| case PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey) => | ||
| new PrefixKeyScanStateEncoder(dataEncoder, keySchema, numColsPrefixKey, | ||
| useColumnFamilies, virtualColFamilyId) | ||
|
|
||
| case RangeKeyScanStateEncoderSpec(keySchema, orderingOrdinals) => | ||
| new RangeKeyScanStateEncoder(dataEncoder, keySchema, orderingOrdinals, | ||
| useColumnFamilies, virtualColFamilyId) | ||
|
|
||
| case _ => | ||
| throw new IllegalArgumentException(s"Unsupported key state encoder spec: " + | ||
| s"$keyStateEncoderSpec") | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the encoderSpec's expose a method instead:
def toEncoder(dataEncoder: DataEncoder, useColumnFamilies, virtualColFamilyId): RocksDBKeyStateEncoderBase
| // Prefix Key schema and projection definitions used by the Avro Serializers | ||
| // and Deserializers | ||
| private val prefixKeySchema = StructType(keySchema.take(numColsPrefixKey)) | ||
| private lazy val prefixKeyAvroType = SchemaConverters.toAvroType(prefixKeySchema) | ||
| private val prefixKeyProj = UnsafeProjection.create(prefixKeySchema) | ||
|
|
||
| // Remaining Key schema and projection definitions used by the Avro Serializers | ||
| // and Deserializers | ||
| private val remainingKeySchema = StructType(keySchema.drop(numColsPrefixKey)) | ||
| private lazy val remainingKeyAvroType = SchemaConverters.toAvroType(remainingKeySchema) | ||
| private val remainingKeyProj = UnsafeProjection.create(remainingKeySchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some of these need to be cleaned up. They're unused and no longer needed
| private val rangeScanAvroSchema = StateStoreColumnFamilySchemaUtils.convertForRangeScan( | ||
| StructType(rangeScanKeyFieldsWithOrdinal.map(_._1).toArray)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these are also no longer used. Please clean them up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically comment on scaladoc style, otherwise LGTM!
| /** Interface for encoding and decoding state store data between UnsafeRow and raw bytes. | ||
| * | ||
| * @note All encode methods expect non-null input rows. Handling of null values is left to the | ||
| * implementing classes. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this in the right place?
| */ | ||
| def encodeKey(row: UnsafeRow): Array[Byte] | ||
|
|
||
| /** Encodes the non-prefix portion of a key row. Used with prefix scan and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Text should start at the second line, not the first in scaladoc
| keyStateEncoderSpec: KeyStateEncoderSpec, | ||
| valueSchema: StructType | ||
| ): AvroEncoder = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please fix the indent
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uber nit: delete extra empty lines
| } | ||
| } | ||
|
|
||
| /** Factory object for creating state encoders used by RocksDB state store. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move text to next line
| def jsonValue: JValue | ||
| def json: String = compact(render(jsonValue)) | ||
|
|
||
| /** Creates a RocksDBKeyStateEncoder for this specification. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto on scaladoc
| if (numColsPrefixKey == 0 || numColsPrefixKey >= keySchema.length) { | ||
| throw StateStoreErrors.incorrectNumOrderingColsForPrefixScan(numColsPrefixKey.toString) | ||
| } | ||
| override def toEncoder( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: new line please
...apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala
Show resolved
Hide resolved
| case PrefixKeyScanStateEncoderSpec(_, _) => | ||
| encodeUnsafeRowToAvro(row, avroEncoder.suffixKeySerializer.get, remainingKeyAvroType, out) | ||
| case RangeKeyScanStateEncoderSpec(_, _) => | ||
| encodeUnsafeRowToAvro(row, avroEncoder.keySerializer, remainingKeyAvroType, out) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to be suffixKeySerializer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is right - RangeKeyScan encoder doesn't use Avro serialization for both parts of the ke
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but you missed one comment: https://github.com/apache/spark/pull/48944/files#r1887807431 on scaladoc. Once done @HeartSaVioR , can you please help merge this?
...core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
Show resolved
Hide resolved
|
The comment link does not show the comment, which I assume the comment is outdated via new commit. |
…and UnsafeRow encoding ### What changes were proposed in this pull request? Currently, we use the internal byte representation to store state for stateful streaming operators in the StateStore. This PR introduces Avro serialization and deserialization capabilities in the RocksDBStateEncoder so that we can instead use Avro encoding to store state. This is currently enabled for the TransformWithState operator via SQLConf to support all functionality supported by TWS ### Why are the changes needed? UnsafeRow is an inherently unstable format that makes no guarantees of being backwards-compatible. Therefore, if the format changes between Spark releases, this could cause StateStore corruptions. Avro is more stable, and inherently enables schema evolution. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Amended and added to unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#48944 from ericm-db/avro-ss. Lead-authored-by: Eric Marnadi <[email protected]> Co-authored-by: Eric Marnadi <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
| storeConf: StateStoreConf, | ||
| hadoopConf: Configuration, | ||
| useMultipleValuesPerKey: Boolean = false): StateStore = { | ||
| hadoopConf.set(StreamExecution.RUN_ID_KEY, storeProviderId.queryRunId.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Random late comment I noticed in the code, this is already called 4 line below
What changes were proposed in this pull request?
Currently, we use the internal byte representation to store state for stateful streaming operators in the StateStore. This PR introduces Avro serialization and deserialization capabilities in the RocksDBStateEncoder so that we can instead use Avro encoding to store state. This is currently enabled for the TransformWithState operator via SQLConf to support all functionality supported by TWS
Why are the changes needed?
UnsafeRow is an inherently unstable format that makes no guarantees of being backwards-compatible. Therefore, if the format changes between Spark releases, this could cause StateStore corruptions. Avro is more stable, and inherently enables schema evolution.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Amended and added to unit tests
Was this patch authored or co-authored using generative AI tooling?
No