4242import com .google .api .gax .rpc .ApiCallContext ;
4343import com .google .api .gax .rpc .UnaryCallable ;
4444import com .google .common .annotations .VisibleForTesting ;
45+ import com .google .common .base .MoreObjects ;
4546import com .google .common .base .Preconditions ;
4647import com .google .common .base .Stopwatch ;
4748import com .google .common .util .concurrent .Futures ;
5152import java .lang .ref .WeakReference ;
5253import java .util .ArrayList ;
5354import java .util .List ;
55+ import java .util .Optional ;
56+ import java .util .StringJoiner ;
5457import java .util .concurrent .ConcurrentHashMap ;
5558import java .util .concurrent .ConcurrentMap ;
5659import java .util .concurrent .ExecutionException ;
5760import java .util .concurrent .Future ;
5861import java .util .concurrent .ScheduledExecutorService ;
5962import java .util .concurrent .TimeUnit ;
60- import java .util .concurrent .atomic . AtomicInteger ;
63+ import java .util .concurrent .TimeoutException ;
6164import java .util .logging .Level ;
6265import java .util .logging .Logger ;
66+ import javax .annotation .Nonnull ;
6367import javax .annotation .Nullable ;
68+ import org .threeten .bp .Duration ;
6469
6570/**
6671 * Queues up the elements until {@link #flush()} is called; once batching is over, returned future
@@ -86,7 +91,7 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
8691 private final BatcherReference currentBatcherReference ;
8792
8893 private Batch <ElementT , ElementResultT , RequestT , ResponseT > currentOpenBatch ;
89- private final AtomicInteger numOfOutstandingBatches = new AtomicInteger ( 0 );
94+ private final ConcurrentMap < Batch < ElementT , ElementResultT , RequestT , ResponseT >, Boolean > outstandingBatches = new ConcurrentHashMap <>( );
9095 private final Object flushLock = new Object ();
9196 private final Object elementLock = new Object ();
9297 private final Future <?> scheduledFuture ;
@@ -297,8 +302,10 @@ public void sendOutstanding() {
297302 } catch (Exception ex ) {
298303 batchResponse = ApiFutures .immediateFailedFuture (ex );
299304 }
305+ accumulatedBatch .setOperation (batchResponse );
306+
307+ outstandingBatches .put (accumulatedBatch , Boolean .TRUE );
300308
301- numOfOutstandingBatches .incrementAndGet ();
302309 ApiFutures .addCallback (
303310 batchResponse ,
304311 new ApiFutureCallback <ResponseT >() {
@@ -310,7 +317,7 @@ public void onSuccess(ResponseT response) {
310317 accumulatedBatch .resource .getByteCount ());
311318 accumulatedBatch .onBatchSuccess (response );
312319 } finally {
313- onBatchCompletion ();
320+ onBatchCompletion (accumulatedBatch );
314321 }
315322 }
316323
@@ -322,18 +329,19 @@ public void onFailure(Throwable throwable) {
322329 accumulatedBatch .resource .getByteCount ());
323330 accumulatedBatch .onBatchFailure (throwable );
324331 } finally {
325- onBatchCompletion ();
332+ onBatchCompletion (accumulatedBatch );
326333 }
327334 }
328335 },
329336 directExecutor ());
330337 }
331338
332- private void onBatchCompletion () {
339+ private void onBatchCompletion (Batch < ElementT , ElementResultT , RequestT , ResponseT > batch ) {
333340 boolean shouldClose = false ;
334341
335342 synchronized (flushLock ) {
336- if (numOfOutstandingBatches .decrementAndGet () == 0 ) {
343+ outstandingBatches .remove (batch );
344+ if (outstandingBatches .isEmpty ()) {
337345 flushLock .notifyAll ();
338346 shouldClose = closeFuture != null ;
339347 }
@@ -349,22 +357,37 @@ private void onBatchCompletion() {
349357 }
350358
351359 private void awaitAllOutstandingBatches () throws InterruptedException {
352- while (numOfOutstandingBatches . get () > 0 ) {
360+ while (! outstandingBatches . isEmpty () ) {
353361 synchronized (flushLock ) {
354362 // Check again under lock to avoid racing with onBatchCompletion
355- if (numOfOutstandingBatches . get () == 0 ) {
363+ if (outstandingBatches . isEmpty () ) {
356364 break ;
357365 }
358366 flushLock .wait ();
359367 }
360368 }
361369 }
362370
371+ @ Override
372+ public void cancelOutstanding () {
373+ for (Batch <?,?,?,?> batch : outstandingBatches .keySet ()) {
374+ batch .cancel ();
375+ }
376+ }
363377 /** {@inheritDoc} */
364378 @ Override
365379 public void close () throws InterruptedException {
380+ close (null );
381+ }
382+
383+ @ Override
384+ public void close (@ Nullable Duration timeout ) throws InterruptedException {
366385 try {
367- closeAsync ().get ();
386+ if (timeout != null ) {
387+ closeAsync ().get (timeout .toMillis (), TimeUnit .MILLISECONDS );
388+ } else {
389+ closeAsync ().get ();
390+ }
368391 } catch (ExecutionException e ) {
369392 // Original stacktrace of a batching exception is not useful, so rethrow the error with
370393 // the caller stacktrace
@@ -374,6 +397,16 @@ public void close() throws InterruptedException {
374397 } else {
375398 throw new IllegalStateException ("unexpected error closing the batcher" , e .getCause ());
376399 }
400+ } catch (TimeoutException e ) {
401+ StringJoiner batchesStr = new StringJoiner ("," );
402+ for (Batch <ElementT , ElementResultT , RequestT , ResponseT > batch : outstandingBatches .keySet ()) {
403+ batchesStr .add (batch .toString ());
404+ }
405+ String msg = "Timed out trying to close batcher after " + timeout + "." ;
406+ msg += " Batch request prototype: " + prototype + "." ;
407+ msg += " Outstanding batches: " + batchesStr ;
408+
409+ throw new BatchingException (msg );
377410 }
378411 }
379412
@@ -393,7 +426,7 @@ public ApiFuture<Void> closeAsync() {
393426 // prevent admission of new elements
394427 closeFuture = SettableApiFuture .create ();
395428 // check if we can close immediately
396- closeImmediately = numOfOutstandingBatches . get () == 0 ;
429+ closeImmediately = outstandingBatches . isEmpty () ;
397430 }
398431
399432 // Clean up accounting
@@ -435,6 +468,8 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
435468 private long totalThrottledTimeMs = 0 ;
436469 private BatchResource resource ;
437470
471+ private ApiFuture <ResponseT > operation ;
472+
438473 private Batch (
439474 RequestT prototype ,
440475 BatchingDescriptor <ElementT , ElementResultT , RequestT , ResponseT > descriptor ,
@@ -457,6 +492,18 @@ void add(
457492 totalThrottledTimeMs += throttledTimeMs ;
458493 }
459494
495+ void setOperation (@ Nonnull ApiFuture <ResponseT > operation ) {
496+ Preconditions .checkNotNull (operation );
497+ this .operation = operation ;
498+ }
499+
500+ void cancel () {
501+ if (this .operation != null ) {
502+ this .operation .cancel (true );
503+ }
504+ }
505+
506+
460507 void onBatchSuccess (ResponseT response ) {
461508 try {
462509 descriptor .splitResponse (response , entries );
@@ -480,6 +527,22 @@ void onBatchFailure(Throwable throwable) {
480527 boolean isEmpty () {
481528 return resource .getElementCount () == 0 ;
482529 }
530+
531+ @ Override
532+ public String toString () {
533+ StringJoiner elementsStr = new StringJoiner ("," );
534+ for (BatchEntry <ElementT , ElementResultT > entry : entries ) {
535+ elementsStr .add (
536+ Optional .ofNullable (entry .getElement ())
537+ .map (Object ::toString )
538+ .orElse ("null" )
539+ );
540+ }
541+ return MoreObjects .toStringHelper (this )
542+ .add ("operation" , operation )
543+ .add ("elements" , elementsStr )
544+ .toString ();
545+ }
483546 }
484547
485548 /**
0 commit comments