11package io .confluent .parallelconsumer ;
22
3- import io .confluent .TransactionState ;
43import lombok .SneakyThrows ;
54import lombok .extern .slf4j .Slf4j ;
65import org .apache .kafka .clients .consumer .Consumer ;
1211import org .apache .kafka .common .TopicPartition ;
1312
1413import java .lang .reflect .Field ;
14+ import java .lang .reflect .Method ;
1515import java .time .Duration ;
1616import java .util .Map ;
1717import java .util .concurrent .Future ;
18- import java .util .concurrent .locks .Lock ;
1918import java .util .concurrent .locks .ReentrantReadWriteLock ;
2019
2120import static io .confluent .csid .utils .StringUtils .msg ;
@@ -26,8 +25,6 @@ public class ProducerManager<K, V> extends AbstractOffsetCommitter<K, V> impleme
2625 protected final Producer <K , V > producer ;
2726 private final ParallelConsumerOptions options ;
2827
29- protected final TransactionState ts = new TransactionState ();
30-
3128 /**
3229 * The {@link KafkaProducer) isn't actually completely thread safe, at least when using it transactionally. We must
3330 * be careful not to send messages to the producer, while we are committing a transaction - "Cannot call send in
@@ -57,7 +54,6 @@ private void initProducer(final Producer<K, V> newProducer) {
5754 log .debug ("Initialising producer transaction session..." );
5855 producer .initTransactions ();
5956 producer .beginTransaction ();
60- ts .setInTransaction ();
6157 } catch (KafkaException e ) {
6258 log .error ("Make sure your producer is setup for transactions - specifically make sure it's {} is set." , ProducerConfig .TRANSACTIONAL_ID_CONFIG , e );
6359 throw e ;
@@ -76,16 +72,21 @@ private void initProducer(final Producer<K, V> newProducer) {
7672 */
7773 @ SneakyThrows
7874 private boolean getProducerIsTransactional (final Producer <K , V > newProducer ) {
79- Field coordinatorField = newProducer .getClass ().getDeclaredField ("transactionManager" );
80- coordinatorField .setAccessible (true );
81- TransactionManager transactionManager = (TransactionManager ) coordinatorField .get (newProducer );
75+ TransactionManager transactionManager = getTransactionManager (newProducer );
8276 if (transactionManager == null ) {
8377 return false ;
8478 } else {
8579 return transactionManager .isTransactional ();
8680 }
8781 }
8882
83+ private TransactionManager getTransactionManager (final Producer <K , V > newProducer ) throws NoSuchFieldException , IllegalAccessException {
84+ Field coordinatorField = newProducer .getClass ().getDeclaredField ("transactionManager" );
85+ coordinatorField .setAccessible (true );
86+ TransactionManager transactionManager = (TransactionManager ) coordinatorField .get (newProducer );
87+ return transactionManager ;
88+ }
89+
8990 /**
9091 * Produce a message back to the broker.
9192 * <p>
@@ -152,15 +153,31 @@ protected void commitOffsets(final Map<TopicPartition, OffsetAndMetadata> offset
152153 ReentrantReadWriteLock .WriteLock writeLock = producerCommitLock .writeLock ();
153154 writeLock .lock ();
154155 try {
155- producer .commitTransaction ();
156+ boolean retrying = retryCount > 0 ;
157+ if (retrying ) {
158+ if (isTransactionCompleting ()) {
159+ // try wait again
160+ producer .commitTransaction ();
161+ }
162+ if (isTransactionReady ()) {
163+ // tx has completed since we last tried, start a new one
164+ producer .beginTransaction ();
165+ }
166+ boolean ready = (lastErrorSavedForRethrow != null ) ? !lastErrorSavedForRethrow .getMessage ().contains ("Invalid transition attempted from state READY to state COMMITTING_TRANSACTION" ) : true ;
167+ if (ready ) {
168+ // try again
169+ log .error ("Was already ready - tx completed between interrupt and retry" );
170+ }
171+ } else {
172+ // happy path
173+ producer .commitTransaction ();
174+ producer .beginTransaction ();
175+ }
156176 } finally {
157177 writeLock .unlock ();
158178 }
159-
160179 }
161180
162- ts .setNotInTransaction ();
163-
164181 onOffsetCommitSuccess (offsetsToSend );
165182
166183 committed = true ;
@@ -175,18 +192,42 @@ protected void commitOffsets(final Map<TopicPartition, OffsetAndMetadata> offset
175192 }
176193 }
177194
178- @ Override
179- protected void afterCommit () {
180- // begin tx for next cycle
181- producer .beginTransaction ();
182- ts .setInTransaction ();
195+ /**
196+ * TODO talk about alternatives to this brute force approach for retrying committing transactions
197+ */
198+ @ SneakyThrows
199+ private boolean isTransactionCompleting () {
200+ TransactionManager transactionManager = getTransactionManager (producer );
201+ Method method = transactionManager .getClass ().getDeclaredMethod ("isCompleting" );
202+ method .setAccessible (true );
203+ return (boolean ) method .invoke (transactionManager );
204+ }
205+
206+ /**
207+ * TODO talk about alternatives to this brute force approach for retrying committing transactions
208+ */
209+ @ SneakyThrows
210+ private boolean isTransactionReady () {
211+ TransactionManager transactionManager = getTransactionManager (producer );
212+ Method method = transactionManager .getClass ().getDeclaredMethod ("isReady" );
213+ method .setAccessible (true );
214+ return (boolean ) method .invoke (transactionManager );
183215 }
184216
217+ /**
218+ * Assumes the system is drained at this point, or draining is not desired.
219+ */
185220 public void close (final Duration timeout ) {
186221 log .debug ("Closing producer, assuming no more in flight..." );
187- if (options .isUsingTransactionalProducer () && ts .isInTransaction ()) {
188- // close started after tx began, but before work was done, otherwise a tx wouldn't have been started
189- producer .abortTransaction ();
222+ if (options .isUsingTransactionalProducer () && !isTransactionReady ()) {
223+ ReentrantReadWriteLock .WriteLock writeLock = producerCommitLock .writeLock ();
224+ writeLock .lock ();
225+ try {
226+ // close started after tx began, but before work was done, otherwise a tx wouldn't have been started
227+ producer .abortTransaction ();
228+ } finally {
229+ writeLock .unlock ();
230+ }
190231 }
191232 producer .close (timeout );
192233 }
0 commit comments