-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-24379][Formats] Add support for Glue schema registry in Table API #17360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 6c5edf9 (Mon Sep 27 05:07:56 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! I left some comments regarding the consistency for now. However, there seem to be fundamental issues such as incorrect formatting etc. Please make sure to follow the IDE setup guide and update the PR accordingly.
...a/org/apache/flink/formats/avro/glue/schema/registry/GluSchemaRegistryAvroFormatFactory.java
Outdated
Show resolved
Hide resolved
...a/org/apache/flink/formats/avro/glue/schema/registry/GluSchemaRegistryAvroFormatFactory.java
Outdated
Show resolved
Hide resolved
...a/org/apache/flink/formats/avro/glue/schema/registry/GluSchemaRegistryAvroFormatFactory.java
Outdated
Show resolved
Hide resolved
...a/org/apache/flink/formats/avro/glue/schema/registry/GluSchemaRegistryAvroFormatFactory.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroOptions.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroOptions.java
Outdated
Show resolved
Hide resolved
|
Can you please add an e2e test for the Table API support? |
|
|
||
| @Override | ||
| public ChangelogMode getChangelogMode() { | ||
| return ChangelogMode.upsert(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this mode refer to? Does this impact the type of connector? For instance, Kafka which supports upsert, and kinesis which does not (insert only)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm actually a bit confused on this point. The Format interface requires implementation of the getChangelogMode method, but I don't understand why this is tied to the format at all. If I'm using a Kafka-Upsert connector then I should get upsert records... if I'm using a generic Kafka stream I should expect all kinds of records, both of which are totally orthogonal to the format used.
When I look at the other Format implementations they all seem to either be using the explicit form of ChangelogMode.all() or ChangelogMode.insetOnly() with no obvious (to me) pattern for why a given Format implementation gets one or the other.
I've changed the code in these two instances to ChangelogMode.all() to match the subset of other formats that also does this, including the DebeziumAvroFormatFactory, but without a deeper understanding of how the Format.getChangelogMode is used, I can't reason more precisely about what this should be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The format can define a changelog, but it'll be up to the connector to defer to the format for providing the changelog mode. SocketDynamicTableSource in flink-examples would be an example where the connector just defers this to the format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I did a test of an actual flink job last night and discovered that unless this is set to insertOnly you can't create an upsert-kafka connector using it. So I clearly don't understand what this changelog mode refers to, but I'm just going to cope.
.../org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java
Outdated
Show resolved
Hide resolved
.../org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java
Show resolved
Hide resolved
...pache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderProvider.java
Show resolved
Hide resolved
|
We will also need a documentation update similar to this: If you do not have capacity to contribute this can you please raise a follow up Jira and link to this one |
|
We're receiving a lot of community feedback that new features are not or not properly documented. It would be really good if we can get the documentation in with this PR. |
|
Responded to feedback, modified changelog mode to match existing code, added format document. |
.../org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java
Outdated
Show resolved
Hide resolved
.../org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroGlueFormatOptions.java
Outdated
Show resolved
Hide resolved
| {{< label "Format: Serialization Schema" >}} | ||
| {{< label "Format: Deserialization Schema" >}} | ||
|
|
||
| The Glue Schema Registry (``avro-glue``) format allows you to read records that were serialized by the ``com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer`` and to write records that can in turn be read by the ``com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer``. These records have their schemas stored out-of-band in a configured registry provided by the AWS Glue Schema Registry [service](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html#schema-registry-schemas). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Glue Schema Registry
Please add "AWS" here:
The AWS Glue Schema Registry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix.
| Format Options | ||
| ---------------- | ||
|
|
||
| Yes, these options have inconsistent naming convnetions. No, I can't fix it. This is for consistentcy with the existing [AWS Glue client code](https://github.com/awslabs/aws-glue-schema-registry/blob/master/common/src/main/java/com/amazonaws/services/schemaregistry/utils/AWSSchemaRegistryConstants.java#L20). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to enforce a naming convention consistent with other Flink formats/connectors, we could transform them in the factory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm doing another pass on the naming of the options and will have it in the next commit. Should be more inline with other formats, and as you suggest I'm doing the translation inside the factory.
| name: Avro AWS Glue Schema Registry | ||
| maven: flink-avro-glue-schema-registry | ||
| category: format | ||
| sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro-glue-schema-registry/$version/flink-avro-glue-schema-registry-$version.jar |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this needs to include the $scala_version, and if possible, only render for Scala 2.12
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The module is pure java and the only dependencies that it has that are scala-binary-versioned are test or provided scope. The module is currently located here in the central repository, so I don't think this is the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has recently changed, since 1.15 it will only support Scala 2.12. You can see the new artifact here.
| <!-- The above dependency hard-codes the Scala 2.12 binary version of this jar, so we replace it --> | ||
| <dependency> | ||
| <groupId>com.kjetland</groupId> | ||
| <artifactId>mbknor-jackson-jsonschema_${scala.binary.version}</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jherico Does this change mean that the format will be compatible with Scala 2.11 also? Although 2.11 has been recently removed
Bump: Here is an example. Note that keys need to be provided via |
|
@jherico @dannycranmer Is there anything we can do to move this PR forward? |
|
@MartijnVisser we do not have capacity to pick it up right now. If we do not hear back from @jherico then we could potentially pick it up sometime before the 1.16 release |
|
@dannycranmer if a create a |
|
@dannycranmer having spent most of my working day on this today, I've come to the conclusion that this class looks to be a better example on which to pattern a new end-to-end test. Specifically, it does almost precisely what I want to do in my test, but uses the confluent registry, whereas the test you linked doesn't use the table API at all as far as I can tell. Please let me know if you concur. |
|
Hello @jherico, sorry for delay. Yes I concur, the test I linked is based on DataStream API.
If this is still a concern based on the recent question, yes please, removing duplicated code would be helpful. |
|
This connector has been moved to https://github.com/apache/flink-connector-aws/tree/main/flink-formats-aws/flink-avro-glue-schema-registry. Closing PR. Please reopen targeting flink-connector-aws |
Fix for https://issues.apache.org/jira/browse/FLINK-24379
What is the purpose of the change
Add missing components required to use the AWS Glue Schema registry in Table connectors.
Brief change log
flink-formats/flink-avro-glue-schema-registry/src/main/resources/META-INF/services/org.apache.flink.table.factories.FactoryVerifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving): no public API changedDocumentation