Skip to content

Kafka serializer and deserializer in Java for working with schemas defined in the Buf Schema Registry.

License

Notifications You must be signed in to change notification settings

bufbuild/bsr-kafka-serde-java

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

9 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

The Buf logo

bsr-kafka-serde-java

License CI Maven Central Version javadoc Slack

bsr-kafka-serde-java provides a Kafka serializer and deserializer in Java for working with schemas defined in the Buf Schema Registry. It pairs with Bufstream's semantic validation feature, using Kafka record headers to automatically convert record values to and from Protobuf.

Usage

Gradle

dependencies {
    implementation("build.buf.bsr.kafka:bsr-kafka-serde:<version>")
}

Maven

<dependency>
    <groupId>build.buf.bsr.kafka</groupId>
    <artifactId>bsr-kafka-serde</artifactId>
    <version>${bsr-kafka-serde.version}</version>
</dependency>

Producer

The producer requires no configuration except for setting the standard ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG. Below is an example publishing a Protobuf EmailUpdated message to a topic using the BSR serializer.

Producer Example
import build.buf.bsr.kafka.ProtoSerializer;
import build.buf.bsr.kafka.gen.bufstream.demo.v1.EmailUpdated;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class ExampleProducer {
    public static void main(String[] args) {
        Properties producerConfig = new Properties();
        producerConfig.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        producerConfig.setProperty(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // Set the value serializer to encode the message as Protobuf bytes
        producerConfig.setProperty(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProtoSerializer.class.getName());
        EmailUpdated emailUpdateMsg =
                EmailUpdated.newBuilder()
                        .setId(UUID.randomUUID().toString())
                        .setNewEmailAddress("[email protected]")
                        .build();
        try (KafkaProducer<String, EmailUpdated> producer = new KafkaProducer<>(producerConfig)) {
            producer.send(new ProducerRecord<>("my-topic", emailUpdateMsg.getId(), emailUpdateMsg));
        }
    }
}

Consumer

The consumer supports the following config settings:

Setting Description
bsr.host Buf Schema Registry hostname (e.g. buf.build). Required.
bsr.token Buf Schema Registry API token.
value.type The class name of the Protobuf message to decode into.

If the value.type is not specified, messages are decoded using DynamicMessage.

Consumer Example
import build.buf.bsr.kafka.ProtoDeserializer;
import build.buf.bsr.kafka.ProtoDeserializerConfig;
import com.google.protobuf.Message;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class ExampleConsumer {
  public static void main(String[] args) {
    Properties consumerConfig = new Properties();
    consumerConfig.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testgroup");
    consumerConfig.setProperty(
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerConfig.setProperty(
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProtoDeserializer.class.getName());
    // Replace the following two configs with the BSR instance and bot user API token for the BSR
    consumerConfig.setProperty(ProtoDeserializerConfig.BSR_HOST_CONFIG, "<bsr-host>");
    consumerConfig.setProperty(ProtoDeserializerConfig.BSR_TOKEN_CONFIG, "<api-token>");

    try (Consumer<String, Message> consumer = new KafkaConsumer<>(consumerConfig)) {
      consumer.subscribe(List.of("my-topic"));
      ConsumerRecords<String, Message> records = consumer.poll(Duration.ofSeconds(1));
      for (ConsumerRecord<String, Message> record : records) {
        System.out.println(record.value());
      }
    }
  }
}

Status: Beta

This library isn't stable yet. However, the final shape is unlikely to change drastically—future edits will be somewhat minor.

Legal

Offered under the Apache 2 license.

About

Kafka serializer and deserializer in Java for working with schemas defined in the Buf Schema Registry.

Resources

License

Code of conduct

Stars

Watchers

Forks

Packages

No packages published

Contributors 3

  •  
  •  
  •  

Languages