Skip to content

Commit 2e49859

Browse files
More Polishing for M2
Add package-info.java Increase Unit Test Coverage Combine Acknowledgement Interfaces Support more than 10 maxMessagesPerPoll Add AcknowledgementOrdering.ORDERED_BY_GROUP Add BatchAcknowledgement Add AsyncBatchAcknowledgement Add tests Add AcknowledgementResultCallback Add tests Replace ExpressionHelper with StringValueResolver Add maxMessagesPerPoll to SqsListener Disable container reuse for TC Use ApplicationContextRunner in tests Address review suggestions Clean up tests Fix InterceptorIntegrationTest Make ComponentFactory a collection Add Unit Tests Apply Spotless
1 parent 53c7145 commit 2e49859

File tree

143 files changed

+8477
-2234
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

143 files changed

+8477
-2234
lines changed

docs/src/main/asciidoc/sqs.adoc

Lines changed: 130 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -372,9 +372,9 @@ A number of possible argument types are allowed in the listener method's signatu
372372
- `List<Message<MyPojo>>` - Enables batch mode and receives the batch that was polled from SQS along with headers.
373373
- `@Header(String headerName)` - provides the specified header.
374374
- `@Headers` - provides the `MessageHeaders` or a `Map<String, Object>`
375-
- `Acknowledgement` - provides the `.acknowledge()` method that can be used to manually acknowledge the message.
375+
- `Acknowledgement` - provides methods for manually acknowledging messages for single message listeners.
376376
AcknowledgementMode must be set to `MANUAL` (see <<Acknowledging Messages>>)
377-
- `AsyncAcknowledgement` - provides the `.acknowledgeAsync()` method that can be used to manually acknowledge the message.
377+
- `BatchAcknowledgement` - provides methods for manually acknowledging partial or whole message batches for batch listeners.
378378
AcknowledgementMode must be set to `MANUAL` (see <<Acknowledging Messages>>)
379379
- `Visibility` - provides the `changeTo()` method that enables changing the message's visibility to the provided value.
380380
- `QueueAttributes` - provides queue attributes for the queue that received the message.
@@ -386,45 +386,37 @@ Here's a sample with many arguments:
386386
[source, java]
387387
----
388388
@SqsListener("${my-queue-name}")
389-
public void listen(Message<MyPojo> message, MyPojo pojo, MessageHeaders headers, Acknowledgement ack, Visibility visibility, QueueAttributes queueAttributes, AsyncAcknowledgement asyncAck, software.amazon.awssdk.services.sqs.model.Message originalMessage) {
389+
public void listen(Message<MyPojo> message, MyPojo pojo, MessageHeaders headers, Acknowledgement ack, Visibility visibility, QueueAttributes queueAttributes, software.amazon.awssdk.services.sqs.model.Message originalMessage) {
390390
Assert.notNull(message);
391391
Assert.notNull(pojo);
392392
Assert.notNull(headers);
393393
Assert.notNull(ack);
394-
Assert.notNull(asyncAck);
395394
Assert.notNull(visibility);
396395
Assert.notNull(queueAttributes);
397396
Assert.notNull(originalMessage);
398397
}
399398
----
400399

401-
IMPORTANT: Currently, batch listeners only support `List<MyPojo>` and `List<Message<MyPojo>>` method arguments.
402-
Other arguments can be found as headers in the `Message<MyPojo>` instances and can be extracted through the `getHeader` method.
400+
IMPORTANT: Batch listeners support a single `List<MyPojo>` and `List<Message<MyPojo>>` method arguments, and an optional `BatchAcknowledgement` or `AsyncBatchAcknowledgement` arguments.
401+
`MessageHeaders` should be extracted from the `Message` instances through the `getHeaders()` method.
403402

404403
==== Batch Processing
405404

406405
All message processing interfaces have both `single message` and `batch` methods.
407-
This means the same set of components can be used to process both single and batch methods, and can share logic where applicable.
406+
This means the same set of components can be used to process both single and batch methods, and share logic where applicable.
408407

409408
When batch mode is enabled, the framework will serve the entire result of a poll to the listener.
409+
If a value greater than 10 is provided for `maxMessagesPerPoll`, the result of multiple polls will be combined and up to the respective amount of messages will be served to the listener.
410410

411-
To enable batch processing using `@SqsListener`, declare a `List<MyPojo>` or `List<Message<MyPojo>>` method argument in the listener method.
411+
To enable batch processing using `@SqsListener`, a single `List<MyPojo>` or `List<Message<MyPojo>>` method argument should be provided in the listener method.
412+
The listener method can also have an optional `BatchAcknowledgement` argument for `AcknowledgementMode.MANUAL`.
412413

413-
IMPORTANT: Currently, when declaring batch mode this way, no other arguments can be added to the method signature.
414-
If any metadata is required, the `List<Message<MyPojo>>` variant should be used, then the headers can be checked to retrieve such information.
415-
The `SqsHeaders.SQS_ACKNOWLEDGMENT_CALLBACK_HEADER` will contain the `AcknowldgementCallback` you can use to manually acknowledge the messages in `AcknowledgementMode.MANUAL`.
416-
If acknowledgement batching is being used, acknowledgements will be batched instead of executing immediately.
417-
See <<Acknowledging Messages>> for more information on `Acknowledging messages`
418-
419-
To configure a batch processing at factory or container level, set `MessageDeliveryStrategy.BATCH` in the `ContainerOptions`, in the factory or container.
420-
This will affect manually created containers.
421-
Containers created from `@SqsListener` annotations will override this setting with whether they contain a `List<Pojoj>` or `List<Message<Pojo>>` argument.
414+
Alternatively, `ContainerOptions` can be set to `ListenerMode.BATCH` in the `ContainerOptions` in the factory or container.
422415

423416
NOTE: The same factory can be used to create both `single message` and `batch` containers for `@SqsListener` methods.
424417

425418
IMPORTANT: In case the same factory is shared by both delivery methods, any supplied `ErrorHandler`, `MessageInterceptor` or `MessageListener` should implement the proper methods.
426419

427-
IMPORTANT: If batch mode is enabled, make sure all components being used have the necessary `batch` methods implemented, otherwise an error may occur.
428420

429421
==== Container Options
430422

@@ -512,9 +504,12 @@ For batching acknowledgements a message is considered as no longer inflight when
512504
See <<Acknowledging Messages>>.
513505

514506
|<<maxMessagesPerPoll>>
515-
|1 - 10
507+
|1 - `Integer.MAX_VALUE`
516508
|10
517509
|The maximum number of messages that will be received by a poll to a SQS queue in this container.
510+
If a value greater than 10 is provided, the result of multiple polls
511+
will be combined, which can be useful for batch listeners.
512+
518513
See AWS documentation for more information.
519514

520515
|<<pollTimeout>>
@@ -543,7 +538,7 @@ See <<Container Lifecycle>>.
543538
|Configures the backpressure strategy to be used by the container.
544539
See <<Configuring BackPressureMode>>.
545540

546-
|`messageDeliveryStrategy`
541+
|`listenerMode`
547542
|`SINGLE_MESSAGE`, `BATCH`
548543
|`SINGLE_MESSAGE`
549544
|Configures whether this container will use `single message` or `batch` listeners.
@@ -996,11 +991,11 @@ public class SqsApplication {
996991
}
997992
----
998993

999-
1000994
=== Acknowledging Messages
1001995

1002996
In `SQS` acknowledging a message is the same as deleting the message from the queue.
1003997
A number of `Acknowledgement` strategies are available and can be configured via `ContainerOptions`.
998+
Optionally, a callback action can be added to be executed after either a successful or failed acknowledgement.
1004999

10051000
Here's an example of a possible configuration:
10061001

@@ -1052,14 +1047,72 @@ IMPORTANT: If an immediate acknowledging triggers an error, message processing i
10521047

10531048
==== Manual Acknowledgement
10541049

1050+
Acknowledgements can be handled manually by setting `AcknowledgementMode.MANUAL` in the `ContainerOptions`.
1051+
10551052
Manual acknowledgement can be used in conjunction with acknowledgement batching - the message will be queued for acknowledgement but won't be executed until one of the above criteria is met.
10561053

10571054
It can also be used in conjunction with immediate acknowledgement.
10581055

1056+
The following arguments can be used in listener methods to manually acknowledge:
1057+
1058+
===== `Acknowledgement`
1059+
1060+
The `Acknowledgement` interface can be used to acknowledge messages in `ListenerMode.SINGLE_MESSAGE`.
1061+
1062+
```java
1063+
public interface Acknowledgement {
1064+
1065+
/**
1066+
* Acknowledge the message.
1067+
*/
1068+
void acknowledge();
1069+
1070+
/**
1071+
* Asynchronously acknowledge the message.
1072+
*/
1073+
CompletableFuture<Void> acknowledgeAsync();
1074+
1075+
}
1076+
```
1077+
1078+
===== `BatchAcknowledgement`
1079+
1080+
The `BatchAcknowledgement` interface can be used to acknowledge messages in `ListenerMode.BATCH`.
1081+
1082+
The `acknowledge(Collection<Message<T>)` method enables acknowledging partial batches.
1083+
1084+
```java
1085+
public interface BatchAcknowledgement<T> {
1086+
1087+
/**
1088+
* Acknowledge all messages from the batch.
1089+
*/
1090+
void acknowledge();
1091+
1092+
/**
1093+
* Asynchronously acknowledge all messages from the batch.
1094+
*/
1095+
CompletableFuture<Void> acknowledgeAsync();
1096+
1097+
/**
1098+
* Acknowledge the provided messages.
1099+
*/
1100+
void acknowledge(Collection<Message<T>> messagesToAcknowledge);
1101+
1102+
/**
1103+
* Asynchronously acknowledge the provided messages.
1104+
*/
1105+
CompletableFuture<Void> acknowledgeAsync(Collection<Message<T>> messagesToAcknowledge);
1106+
1107+
}
1108+
```
1109+
10591110
==== Acknowledgement Ordering
10601111

1061-
- `PARALLEL` - Acknowledges the messages as soon as one of the above criterias is met - many acknowledgement calls can be made in parallel.
1062-
- `ORDERED` - One batch of acknowledgements will only be executed after the previous one is completed, ensuring `FIFO` ordering of acknowledgements.
1112+
- `PARALLEL` - Acknowledges the messages as soon as one of the above criteria is met - many acknowledgement calls can be made in parallel.
1113+
- `ORDERED` - One batch of acknowledgements will be executed after the previous one is completed, ensuring `FIFO` ordering for `batching` acknowledgements.
1114+
- `ORDERED_BY_GROUP` - One batch of acknowledgements will be executed after the previous one for the same group is completed, ensuring `FIFO` ordering of acknowledgements with parallelism between message groups.
1115+
Only available for `FIFO` queues.
10631116

10641117

10651118
==== Acknowledgement Defaults
@@ -1078,15 +1131,62 @@ The defaults for acknowledging differ for `Standard` and `FIFO` SQS queues.
10781131

10791132
NOTE: PARALLEL is the default for FIFO because ordering is guaranteed for processing.
10801133
This assures no messages from a given `MessageGroup` will be polled until the previous batch is acknowledged.
1134+
Implementations of this interface will be executed after an acknowledgement execution completes with either success or failure.
1135+
1136+
==== Acknowledgement Result Callback
1137+
1138+
The framework offers the `AcknowledgementResultCallback` and `AsyncAcknowledgementCallback` interfaces that can be added to a `SqsMessageListenerContainer` or `SqsMessageListenerContainerFactory`.
1139+
1140+
```java
1141+
public interface AcknowledgementResultCallback<T> {
1142+
1143+
default void onSuccess(Collection<Message<T>> messages) {
1144+
}
1145+
1146+
default void onFailure(Collection<Message<T>> messages, Throwable t) {
1147+
}
1148+
1149+
}
1150+
```
1151+
1152+
```java
1153+
public interface AsyncAcknowledgementResultCallback<T> {
1154+
1155+
default CompletableFuture<Void> onSuccess(Collection<Message<T>> messages) {
1156+
return CompletableFuture.completedFuture(null);
1157+
}
1158+
1159+
default CompletableFuture<Void> onFailure(Collection<Message<T>> messages, Throwable t) {
1160+
return CompletableFuture.completedFuture(null);
1161+
}
1162+
1163+
}
1164+
```
1165+
1166+
```java
1167+
@Bean
1168+
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
1169+
return SqsMessageListenerContainerFactory
1170+
.builder()
1171+
.sqsAsyncClient(sqsAsyncClient)
1172+
.acknowledgementResultCallback(getAcknowledgementResultCallback())
1173+
.build();
1174+
}
1175+
```
1176+
1177+
NOTE: When `immediate acknowledgement` is set, as is the default for `FIFO` queues, the callback will be executed **before** the next message in the batch is processed, and next message processing will wait for the callback completion.
1178+
This can be useful for taking action such as retrying to delete the messages, or stopping the container to prevent duplicate processing in case an acknowledgement fails in a FIFO queue.
1179+
For `batch parallel processing`, as is the default for `Standard` queues the callback execution happens asynchronously.
1180+
10811181

10821182
=== Global Configuration for @SqsListeners
10831183

1084-
A set of configurations can be set for all containers from `@SqsListener` by providing `SqsListenerCustomizer` beans.
1184+
A set of configurations can be set for all containers from `@SqsListener` by providing `SqsListenerConfigurer` beans.
10851185

10861186
[source, java]
10871187
----
10881188
@FunctionalInterface
1089-
public interface SqsListenerCustomizer {
1189+
public interface SqsListenerConfigurer {
10901190
10911191
void configure(EndpointRegistrar registrar);
10921192
@@ -1111,12 +1211,12 @@ A simple example would be:
11111211
[source, java]
11121212
----
11131213
@Bean
1114-
SqsListenerCustomizer customizer(ObjectMapper objectMapper) {
1214+
SqsListenerConfigurer configurer(ObjectMapper objectMapper) {
11151215
return registrar -> registrar.setObjectMapper(objectMapper);
11161216
}
11171217
----
11181218

1119-
NOTE: Many `SqsListenerCustomizer` beans can be registered in the context.
1219+
NOTE: Any number of `SqsListenerConfigurer` beans can be registered in the context.
11201220
All instances will be looked up at application startup and iterated through.
11211221

11221222
=== Message Processing Throughput
@@ -1138,7 +1238,7 @@ When using immediate acknowledgement, a message is considered as no longer infli
11381238

11391239

11401240
===== maxMessagesPerPoll
1141-
Set in `ContainerOptions`.
1241+
Set in `ContainerOptions` or the `@SqsListener` annotation.
11421242
Represents the maximum number of messages returned by a single poll to a SQS queue, to a maximum of 10.
11431243
This value has to be less than or equal to `maxInflightMessagesPerQueue`.
11441244
Defaults to 10.
@@ -1209,11 +1309,10 @@ Otherwise, a `sync` interface should be used.
12091309

12101310
==== Providing a TaskExecutor
12111311

1212-
The default `TaskExecutor` is a `ThreadPoolTaskExecutor`, and a different `componentTaskExecutor` can be set in the `ContainerOptions`.
1312+
The default `TaskExecutor` is a `ThreadPoolTaskExecutor`, and a different `componentTaskExecutor` supplier can be set in the `ContainerOptions`.
12131313

12141314
When providing a custom executor, it's important that it's configured to support all threads that will be created, which should be (maxInflightMessagesPerQueue * total number of queues).
1215-
When set as a `MessageListenerContainerFactory` options, it's important to consider all the containers it will be applied to.
1216-
Also, to avoid excessive thread hopping, a `MessageExecutionThreadFactory` should be set to the executor.
1315+
Also, to avoid unnecessary thread hopping between blocking components, a `MessageExecutionThreadFactory` should be set to the executor.
12171316

12181317
If setting the `ThreadFactory` is not possible, it's advisable to allow for extra threads in the thread pool to account for the time between a new thread is requested and the previous thread is released.
12191318

spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration;
2222
import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration;
2323
import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration;
24-
import io.awspring.cloud.sqs.config.SqsListenerCustomizer;
24+
import io.awspring.cloud.sqs.config.SqsListenerConfigurer;
2525
import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
2626
import io.awspring.cloud.sqs.listener.ContainerOptions;
2727
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
@@ -49,32 +49,37 @@
4949
* @since 3.0
5050
*/
5151
@Configuration(proxyBeanMethods = false)
52-
@ConditionalOnClass({ SqsAsyncClient.class })
53-
@EnableConfigurationProperties({ SqsProperties.class })
52+
@ConditionalOnClass(SqsAsyncClient.class)
53+
@EnableConfigurationProperties(SqsProperties.class)
5454
@Import(SqsBootstrapConfiguration.class)
5555
@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class })
5656
@ConditionalOnProperty(name = "spring.cloud.aws.sqs.enabled", havingValue = "true", matchIfMissing = true)
5757
public class SqsAutoConfiguration {
5858

59+
private final SqsProperties sqsProperties;
60+
61+
public SqsAutoConfiguration(SqsProperties sqsProperties) {
62+
this.sqsProperties = sqsProperties;
63+
}
64+
5965
@ConditionalOnMissingBean
6066
@Bean
61-
public SqsAsyncClient sqsAsyncClient(SqsProperties properties,
62-
AwsClientBuilderConfigurer awsClientBuilderConfigurer,
67+
public SqsAsyncClient sqsAsyncClient(AwsClientBuilderConfigurer awsClientBuilderConfigurer,
6368
ObjectProvider<AwsClientCustomizer<SqsAsyncClientBuilder>> configurer) {
64-
return awsClientBuilderConfigurer.configure(SqsAsyncClient.builder(), properties, configurer.getIfAvailable())
65-
.build();
69+
return awsClientBuilderConfigurer
70+
.configure(SqsAsyncClient.builder(), this.sqsProperties, configurer.getIfAvailable()).build();
6671
}
6772

6873
@ConditionalOnMissingBean
6974
@Bean
70-
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsProperties sqsProperties,
75+
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(
7176
ObjectProvider<SqsAsyncClient> sqsAsyncClient, ObjectProvider<AsyncErrorHandler<Object>> asyncErrorHandler,
7277
ObjectProvider<ErrorHandler<Object>> errorHandler,
7378
ObjectProvider<AsyncMessageInterceptor<Object>> asyncInterceptors,
7479
ObjectProvider<MessageInterceptor<Object>> interceptors) {
7580

7681
SqsMessageListenerContainerFactory<Object> factory = new SqsMessageListenerContainerFactory<>();
77-
factory.configure(options -> configureContainerOptions(options, sqsProperties));
82+
factory.configure(this::configureContainerOptions);
7883
sqsAsyncClient.ifAvailable(factory::setSqsAsyncClient);
7984
asyncErrorHandler.ifAvailable(factory::setErrorHandler);
8085
errorHandler.ifAvailable(factory::setErrorHandler);
@@ -83,16 +88,16 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
8388
return factory;
8489
}
8590

86-
private void configureContainerOptions(ContainerOptions.Builder options, SqsProperties sqsProperties) {
91+
private void configureContainerOptions(ContainerOptions.Builder options) {
8792
PropertyMapper mapper = PropertyMapper.get().alwaysApplyingWhenNonNull();
88-
mapper.from(sqsProperties.getListener().getMaxInflightMessagesPerQueue())
93+
mapper.from(this.sqsProperties.getListener().getMaxInflightMessagesPerQueue())
8994
.to(options::maxInflightMessagesPerQueue);
90-
mapper.from(sqsProperties.getListener().getMaxMessagesPerPoll()).to(options::maxMessagesPerPoll);
91-
mapper.from(sqsProperties.getListener().getPollTimeout()).to(options::pollTimeout);
95+
mapper.from(this.sqsProperties.getListener().getMaxMessagesPerPoll()).to(options::maxMessagesPerPoll);
96+
mapper.from(this.sqsProperties.getListener().getPollTimeout()).to(options::pollTimeout);
9297
}
9398

9499
@Bean
95-
public SqsListenerCustomizer objectMapperCustomizer(ObjectProvider<ObjectMapper> objectMapperProvider) {
100+
public SqsListenerConfigurer objectMapperCustomizer(ObjectProvider<ObjectMapper> objectMapperProvider) {
96101
ObjectMapper objectMapper = objectMapperProvider.getIfUnique();
97102
return registrar -> {
98103
if (registrar.getObjectMapper() == null && objectMapper != null) {

0 commit comments

Comments
 (0)