Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,14 @@ public KafkaTemplate(ProducerFactory<K, V> producerFactory) {

/**
* Create an instance using the supplied producer factory and autoFlush setting.
* Set autoFlush to true if you wish to synchronously interact with Kafka, calling
* {@link java.util.concurrent.Future#get()} on the result.
* <p>
* Set autoFlush to {@code true} if you have configured the producer's
* {@code linger.ms} to a non-default value and wish send operations on this template
* to occur immediately, regardless of that setting, or if you wish to block until the
* broker has acknowledged receipt according to the producer's {@code acks} property.
* @param producerFactory the producer factory.
* @param autoFlush true to flush after each send.
* @see Producer#flush()
*/
public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) {
this.producerFactory = producerFactory;
Expand Down
8 changes: 5 additions & 3 deletions src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,15 @@ public class KRequestingApplication {
}

@Bean
public ReplyingKafkaTemplate<String, String, String> kafkaTemplate(ProducerFactory<String, String> pf,
public ReplyingKafkaTemplate<String, String, String> kafkaTemplate(
ProducerFactory<String, String> pf,
KafkaMessageListenerContainer<String, String> replyContainer) {
return new ReplyingKafkaTemplate<>(pf, replyContainer);
}

@Bean
public KafkaMessageListenerContainer<String, String> replyContainer(ConsumerFactory<String, String> cf) {
public KafkaMessageListenerContainer<String, String> replyContainer(
ConsumerFactory<String, String> cf) {
ContainerProperties containerProperties = new ContainerProperties("kReplies");
return new KafkaMessageListenerContainer<>(cf, containerProperties);
}
Expand Down Expand Up @@ -1467,7 +1469,7 @@ public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offse
Notice that you can still access the batch headers too.

Starting with _versions 2.1.1_, the `org.springframework.core.convert.ConversionService` used by the default
`org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory` to reslove parameters for the invocation
`o.s.messaging.handler.annotation.support.MessageHandlerMethodFactory` to reslove parameters for the invocation
of a listener method is supplied with all beans implementing any of the following interfaces:

- `org.springframework.core.convert.converter.Converter`
Expand Down
120 changes: 120 additions & 0 deletions src/reference/asciidoc/si-kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,125 @@ In most cases, this will be an `ErrorMessageSendingRecoverer` which will send th
When building `ErrorMessage` (for use in the `error-channel` or `recovery-callback`), you can customize the error message using the `error-message-strategy` property.
By default, a `RawRecordHeaderErrorMessageStrategy` is used; providing access to the converted message as well as the raw `ConsumerRecord`.


[[si-outbound-gateway]]
==== Outbound Gateway

The outbound gateway is for request/reply operations; it is different to most Spring Integration gateways in that the sending thread does not block in the gateway, the reply is processed on the reply listener container thread.
Of course, if user code invokes the gateway behind a synchronous https://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#gateway[Messaging Gateway], the user thread will block there until the reply is received (or a timeout occurs).

IMPORTANT: the gateway will not accept requests until the reply container has been assigned its topics and partitions.
It is suggested that you add a `ConsumerRebalanceListener` to the template's reply container properties and wait for the `onPartitionsAssigned` call before sending messages to the gateway.

Here is an example of configuring a gateway, with Java Configuration:

[source, java]
----
@Bean
@ServiceActivator(inputChannel = "kafkaRequests", outputChannel = "kafkaReplies")
public KafkaProducerMessageHandler<String, String> outGateway(
ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
return new KafkaProducerMessageHandler<>(kafkaTemplate);
}
----

Notice that the same class as the <<si-outbound, outbound channel adapter>> is used, the only difference being that the kafka template passed into the constructor is a `ReplyingKafkaTemplate` - see <<replying-template>> for more information.

The outbound topic, partition, key etc, are determined the same way as the outbound adapter.
The reply topic is determined as follows:

1. A message header `KafkaHeaders.REPLY_TOPIC`, if present (must have a `String` or `byte[]` value) - validated against the template's reply container subscribed topics.
2. If the template's `replyContainer` is subscribed to just one topic, it will be used.

You can also specify a `KafkaHeaders.REPLY_PARTITION` header to determine a specific partition to be used for replies.
Again, this is validated against the template's reply container subscriptions.

Configuring with the Java DSL:

[source, java]
----
@Bean
public IntegrationFlow outboundGateFlow(
ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
return IntegrationFlows.from("kafkaRequests")
.handle(Kafka.outboundGateway(kafkaTemplate))
.channel("kafkaReplies")
.get();
}
----

Or:

[source, java]
----
@Bean
public IntegrationFlow outboundGateFlow() {
return IntegrationFlows.from("kafkaRequests")
.handle(Kafka.outboundGateway(producerFactory(), replyContainer())
.configureKafkaTemplate(t -> t.replyTimeout(30_000)))
.channel("kafkaReplies")
.get();
}
----

XML configuration is not currently available for this component.

[[si-inbound-gateway]]
==== Inbound Gateway

The inbound gateway is for request/reply operations.

Configuring an inbound gateway with Java Configuration:

[source, java]
----
@Bean
public KafkaInboundGateway<Integer, String, String> inboundGateway(
AbstractMessageListenerContainer<Integer, String>container,
KafkaTemplate<Integer, String> replyTemplate) {

KafkaInboundGateway<Integer, String, String> gateway =
new KafkaInboundGateway<>(container, replyTemplate);
gateway.setRequestChannel(requests);
gateway.setReplyChannel(replies);
gateway.setReplyTimeout(30_000);
return gateway;
}
----

Configuring a simple upper case converter with the Java DSL:

[source, java]
----
@Bean
public IntegrationFlow serverGateway(
ConcurrentMessageListenerContainer<Integer, String> container,
KafkaTemplate<Integer, String> replyTemplate) {
return IntegrationFlows
.from(Kafka.inboundGateway(container, template)
.replyTimeout(30_000))
.<String, String>transform(String::toUpperCase)
.get();
}
----

Or:

[source, java]
----
@Bean
public IntegrationFlow serverGateway() {
return IntegrationFlows
.from(Kafka.inboundGateway(consumerFactory(), containerProperties(),
producerFactory())
.replyTimeout(30_000))
.<String, String>transform(String::toUpperCase)
.get();
}
----

XML configuration is not currently available for this component.

[[message-conversion]]
==== Message Conversion

Expand Down Expand Up @@ -350,3 +469,4 @@ The 2.3.x branch introduced the following changes:
* Support `ConsumerAwareMessageListener` (`Consumer` is available in a message header)
* Update to Spring Integration 5.0 and Java 8
* Moved Java DSL to main project
* Added inbound and outbound gateways (3.0.2)