Skip to content

Commit 7957e96

Browse files
committed
docs: Retry documentation (#194)
1 parent 84cf11a commit 7957e96

File tree

6 files changed

+339
-7
lines changed

6 files changed

+339
-7
lines changed

.idea/runConfigurations/asciidoc_template_build.xml

Lines changed: 27 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/runConfigurations/license_format.xml

Lines changed: 27 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.adoc

Lines changed: 120 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,124 @@ image::https://lucid.app/publicSegments/view/f7a05e99-24e6-4ea3-b3d0-978e306aa56
644644

645645
Even during retries, offsets will always be committed only after successful processing, and in order.
646646

647+
== Retries
648+
649+
If processing of a record fails, the record will be placed back into it's queue and retried after a time which can be configured (see the `ParallelConsumerOptions` class).
650+
Ordering guarantees will always be adhered to, regardless of failure.
651+
652+
A failure is denoted by *any* exception being thrown from the user's processing function.
653+
The system catches these exceptions, logs them and replaces the record in the queue for processing later.
654+
All types of Exceptions thrown are considered retriable.
655+
To not retry a record, do not throw an exception from your processing fuction.
656+
657+
If for some reason you want to proactively fail a record, without relying on some other system throwing an exception which you don't catch - simply throw an exception of your own design, which the system will treat the same way.
658+
659+
To configure the retry delay, see `ParallelConsumerOptions#defaultRetryDelay`.
660+
661+
At the moment there is no terminal error support, so messages will continue to be retried forever as long as an exception continues to be thrown from the user function (see <<skipping-records>>).
662+
But still this will not hold up the queues in `KEY` or `UNORDERED` modes, however `PARTITION` mode it *will* block progress.
663+
Offsets will also continue to be committed (see <<commit-mode>> and <<Offset Map>>).
664+
665+
=== Retry Delay Function
666+
667+
As part of the https://github.com/confluentinc/parallel-consumer/issues/65[enhanced retry epic], the ability to https://github.com/confluentinc/parallel-consumer/issues/82[dynamically determine the retry delay] was added.
668+
This can be used to customise retry delay for a record, such as exponential back off or have different delays for different types of records, or have the delay determined by the status of a system etc.
669+
670+
You can access the retry count of a record through it's wrapped `WorkContainer` class, which is the input variable to the retry delay function.
671+
672+
.Example retry delay function implementing exponential backoff
673+
[source,java,indent=0]
674+
----
675+
final double multiplier = 0.5;
676+
final int baseDelaySecond = 1;
677+
678+
ParallelConsumerOptions.<String, String>builder()
679+
.retryDelayProvider(workContainer -> {
680+
int numberOfFailedAttempts = workContainer.getNumberOfFailedAttempts();
681+
long delayMillis = (long) (baseDelaySecond * Math.pow(multiplier, numberOfFailedAttempts) * 1000);
682+
return Duration.ofMillis(delayMillis);
683+
});
684+
----
685+
686+
[[skipping-records]]
687+
=== Skipping Records
688+
689+
If for whatever reason you want to skip a record, simply do not throw an exception, or catch any exception being thrown, log and swallow it and return from the user function normally.
690+
The system will treat this as a record processing success, mark the record as completed and move on as though it were normal operation.
691+
692+
A user may choose to skip a record for example, if it has been retried too many times or if the record is invalid or doesn't need processing.
693+
694+
Implementing a https://github.com/confluentinc/parallel-consumer/issues/196[max retries feature] as a part of the system is planned.
695+
696+
.Example of skipping a record after a maximum number of retries is reached
697+
[source,java,indent=0]
698+
----
699+
final int maxRetries = 10;
700+
final Map<ConsumerRecord<String, String>, Long> retriesCount = new ConcurrentHashMap<>();
701+
702+
pc.poll(consumerRecord -> {
703+
Long retryCount = retriesCount.computeIfAbsent(consumerRecord, ignore -> 0L);
704+
if (retryCount < maxRetries) {
705+
processRecord(consumerRecord);
706+
// no exception, so completed - remove from map
707+
retriesCount.remove(consumerRecord);
708+
} else {
709+
log.warn("Retry count {} exceed manx of {} for record {}", retryCount, maxRetries, consumerRecord);
710+
// giving up, remove from map
711+
retriesCount.remove(consumerRecord);
712+
}
713+
});
714+
----
715+
716+
=== Circuit Breaker Pattern
717+
718+
Althought the system doesn't have an https://github.com/confluentinc/parallel-consumer/issues/110[explicit circuit breaker pattern feature], one can be created by conbining the custom retry delay function and proactive failure.
719+
For example, the retry delay can be calculated based upon the status of an external system - i.e. if the external system is currently out of action, use a higher retry.
720+
Then in the processing function, again check the status of the external system first, and if it's still offline, throw an exception proactively without attempting to process the message.
721+
This will put the message back in the queue.
722+
723+
.Example of circuit break implementation
724+
[source,java,indent=0]
725+
----
726+
final Map<String, Boolean> upMap = new ConcurrentHashMap<>();
727+
728+
pc.poll(consumerRecord -> {
729+
String serverId = extractServerId(consumerRecord);
730+
boolean up = upMap.computeIfAbsent(serverId, ignore -> true);
731+
732+
if (!up) {
733+
updateStatusOfSever(serverId);
734+
}
735+
736+
if (up) {
737+
try {
738+
processRecord(consumerRecord);
739+
} catch (CircuitBreakingException e) {
740+
log.warn("Server {} is circuitBroken, will retry message when server is up. Record: {}", serverId, consumerRecord);
741+
upMap.put(serverId, false);
742+
}
743+
// no exception, so set server status UP
744+
upMap.put(serverId, true);
745+
} else {
746+
log.warn("Server {} currently down, will retry record latter {}", up, consumerRecord);
747+
}
748+
});
749+
----
750+
751+
=== Head of Line Blocking
752+
753+
In order to have a failing record not block progress of a partition, one of the ordering modes other than `PARTITION` must be used, so that the system is allowed to process other messages that are perhaps in `KEY` order or in the case of `UNORDERED` processing - any message.
754+
This is because in `PARTITION` ordering mode, records are always processed in order of partition, and so the Head of Line blocking feature is effectively disabled.
755+
756+
=== Future Work
757+
758+
Improvements to this system are planned, see the following issues:
759+
760+
* https://github.com/confluentinc/parallel-consumer/issues/65[Enhanced retry epic #65]
761+
* https://github.com/confluentinc/parallel-consumer/issues/48[Support scheduled message processing (scheduled retry)]
762+
* https://github.com/confluentinc/parallel-consumer/issues/196[Provide option for max retires, and a call back when reached (potential DLQ) #196]
763+
* https://github.com/confluentinc/parallel-consumer/issues/34[Monitor for progress and optionally shutdown (leave consumer group), skip message or send to DLQ #34]
764+
647765
== Result Models
648766

649767
* Void
@@ -659,6 +777,7 @@ When your function is actually run, a result object will be streamed back to you
659777
After your operation completes, you can also choose to publish a result message back to Kafka.
660778
The message publishing metadata can be streamed back to your client code.
661779

780+
[[commit-mode]]
662781
== Commit Mode
663782

664783
The system gives you three choices for how to do offset commits.
@@ -668,7 +787,7 @@ The `transactional` mode is explained in the next section.
668787

669788
`Asynchronous` mode is faster, as it doesn't block the control loop.
670789

671-
`Synchronous` will block the processing loop until a successful commit response is received, however, `Asynchronous` will still be capped by the max processing settings in the `Options` class.
790+
`Synchronous` will block the processing loop until a successful commit response is received, however, `Asynchronous` will still be capped by the max processing settings in the `ParallelConsumerOptions` class.
672791

673792
If you're used to using the auto commit mode in the normal Kafka consumer, you can think of the `Asynchronous` mode being similar to this.
674793
We suggest starting with this mode, and it is the default.

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
package io.confluent.parallelconsumer;
22

33
/*-
4-
* Copyright (C) 2020-2021 Confluent, Inc.
4+
* Copyright (C) 2020-2022 Confluent, Inc.
55
*/
6-
76
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
87
import io.confluent.parallelconsumer.state.WorkContainer;
98
import lombok.Builder;
@@ -147,13 +146,17 @@ public enum CommitMode {
147146
private final Duration defaultMessageRetryDelay = Duration.ofSeconds(1);
148147

149148
/**
150-
* When present, use this to generate the retry delay, instad of {@link #getDefaultMessageRetryDelay()}.
149+
* When present, use this to generate the retry delay, instead of {@link #getDefaultMessageRetryDelay()}.
151150
* <p>
152151
* Overrides {@link #defaultMessageRetryDelay}, even if it's set.
153152
*/
154153
@Builder.Default
155154
private final Function<WorkContainer, Duration> retryDelayProvider;
156155

156+
/**
157+
* Dirty global access to the {@link #retryDelayProvider}.
158+
*/
159+
// TODO remove need for writeable global access
157160
public static Function<WorkContainer, Duration> retryDelayProviderStatic;
158161

159162
/**

parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,25 @@
11
package io.confluent.parallelconsumer.examples.core;
22

33
/*-
4-
* Copyright (C) 2020 Confluent, Inc.
4+
* Copyright (C) 2020-2022 Confluent, Inc.
55
*/
6-
76
import io.confluent.parallelconsumer.ParallelConsumerOptions;
87
import io.confluent.parallelconsumer.ParallelStreamProcessor;
98
import lombok.Value;
109
import lombok.extern.slf4j.Slf4j;
1110
import org.apache.commons.lang3.RandomUtils;
11+
import org.apache.commons.lang3.concurrent.CircuitBreakingException;
1212
import org.apache.kafka.clients.consumer.Consumer;
1313
import org.apache.kafka.clients.consumer.ConsumerRecord;
1414
import org.apache.kafka.clients.consumer.KafkaConsumer;
1515
import org.apache.kafka.clients.producer.KafkaProducer;
1616
import org.apache.kafka.clients.producer.Producer;
1717
import org.apache.kafka.clients.producer.ProducerRecord;
1818

19+
import java.time.Duration;
20+
import java.util.Map;
1921
import java.util.Properties;
22+
import java.util.concurrent.ConcurrentHashMap;
2023

2124
import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY;
2225
import static pl.tlinkowski.unij.api.UniLists.of;
@@ -111,4 +114,82 @@ static class Result {
111114
String payload;
112115
}
113116

117+
void customRetryDelay() {
118+
// tag::customRetryDelay[]
119+
final double multiplier = 0.5;
120+
final int baseDelaySecond = 1;
121+
122+
ParallelConsumerOptions.<String, String>builder()
123+
.retryDelayProvider(workContainer -> {
124+
int numberOfFailedAttempts = workContainer.getNumberOfFailedAttempts();
125+
long delayMillis = (long) (baseDelaySecond * Math.pow(multiplier, numberOfFailedAttempts) * 1000);
126+
return Duration.ofMillis(delayMillis);
127+
});
128+
// end::customRetryDelay[]
129+
}
130+
131+
132+
void maxRetries() {
133+
ParallelStreamProcessor<String, String> pc = ParallelStreamProcessor.createEosStreamProcessor(null);
134+
// tag::maxRetries[]
135+
final int maxRetries = 10;
136+
final Map<ConsumerRecord<String, String>, Long> retriesCount = new ConcurrentHashMap<>();
137+
138+
pc.poll(consumerRecord -> {
139+
Long retryCount = retriesCount.computeIfAbsent(consumerRecord, ignore -> 0L);
140+
if (retryCount < maxRetries) {
141+
processRecord(consumerRecord);
142+
// no exception, so completed - remove from map
143+
retriesCount.remove(consumerRecord);
144+
} else {
145+
log.warn("Retry count {} exceeded max of {} for record {}", retryCount, maxRetries, consumerRecord);
146+
// giving up, remove from map
147+
retriesCount.remove(consumerRecord);
148+
}
149+
});
150+
// end::maxRetries[]
151+
}
152+
153+
private void processRecord(final ConsumerRecord<String, String> record) {
154+
// no-op
155+
}
156+
157+
void circuitBreaker() {
158+
ParallelStreamProcessor<String, String> pc = ParallelStreamProcessor.createEosStreamProcessor(null);
159+
// tag::circuitBreaker[]
160+
final Map<String, Boolean> upMap = new ConcurrentHashMap<>();
161+
162+
pc.poll(consumerRecord -> {
163+
String serverId = extractServerId(consumerRecord);
164+
boolean up = upMap.computeIfAbsent(serverId, ignore -> true);
165+
166+
if (!up) {
167+
up = updateStatusOfSever(serverId);
168+
}
169+
170+
if (up) {
171+
try {
172+
processRecord(consumerRecord);
173+
} catch (CircuitBreakingException e) {
174+
log.warn("Server {} is circuitBroken, will retry message when server is up. Record: {}", serverId, consumerRecord);
175+
upMap.put(serverId, false);
176+
}
177+
// no exception, so set server status UP
178+
upMap.put(serverId, true);
179+
} else {
180+
log.warn("Server {} currently down, will retry record latter {}", up, consumerRecord);
181+
}
182+
});
183+
// end::circuitBreaker[]
184+
}
185+
186+
private boolean updateStatusOfSever(final String serverId) {
187+
return false;
188+
}
189+
190+
private String extractServerId(final ConsumerRecord<String, String> consumerRecord) {
191+
// no-op
192+
return null;
193+
}
194+
114195
}

0 commit comments

Comments
 (0)