3535import static org .apache .hadoop .hbase .util .ConcurrentMapUtils .computeIfAbsent ;
3636
3737import java .io .IOException ;
38+ import java .util .ArrayList ;
3839import java .util .Arrays ;
3940import java .util .HashSet ;
4041import java .util .Iterator ;
4142import java .util .LinkedHashMap ;
43+ import java .util .List ;
4244import java .util .Map ;
4345import java .util .Optional ;
4446import java .util .Set ;
@@ -122,6 +124,26 @@ public boolean equals(Object obj) {
122124 }
123125 }
124126
127+ private static final class RegionLocationsFutureResult {
128+ private final CompletableFuture <RegionLocations > future ;
129+ private final RegionLocations result ;
130+ private final Throwable e ;
131+
132+ public RegionLocationsFutureResult (CompletableFuture <RegionLocations > future ,
133+ RegionLocations result , Throwable e ) {
134+ this .future = future ;
135+ this .result = result ;
136+ this .e = e ;
137+ }
138+
139+ public void complete () {
140+ if (e != null ) {
141+ future .completeExceptionally (e );
142+ }
143+ future .complete (result );
144+ }
145+ }
146+
125147 private static final class TableCache {
126148
127149 private final ConcurrentNavigableMap <byte [], RegionLocations > cache =
@@ -148,18 +170,20 @@ public Optional<LocateRequest> getCandidate() {
148170 return allRequests .keySet ().stream ().filter (r -> !isPending (r )).findFirst ();
149171 }
150172
151- public void clearCompletedRequests (RegionLocations locations ) {
173+ public List <RegionLocationsFutureResult > clearCompletedRequests (RegionLocations locations ) {
174+ List <RegionLocationsFutureResult > futureResultList = new ArrayList <>();
152175 for (Iterator <Map .Entry <LocateRequest , CompletableFuture <RegionLocations >>> iter =
153176 allRequests .entrySet ().iterator (); iter .hasNext ();) {
154177 Map .Entry <LocateRequest , CompletableFuture <RegionLocations >> entry = iter .next ();
155- if (tryComplete (entry .getKey (), entry .getValue (), locations )) {
178+ if (tryComplete (entry .getKey (), entry .getValue (), locations , futureResultList )) {
156179 iter .remove ();
157180 }
158181 }
182+ return futureResultList ;
159183 }
160184
161185 private boolean tryComplete (LocateRequest req , CompletableFuture <RegionLocations > future ,
162- RegionLocations locations ) {
186+ RegionLocations locations , List < RegionLocationsFutureResult > futureResultList ) {
163187 if (future .isDone ()) {
164188 return true ;
165189 }
@@ -185,7 +209,7 @@ private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations
185209 completed = loc .getRegion ().containsRow (req .row );
186210 }
187211 if (completed ) {
188- future . complete ( locations );
212+ futureResultList . add ( new RegionLocationsFutureResult ( future , locations , null ) );
189213 return true ;
190214 } else {
191215 return false ;
@@ -320,32 +344,36 @@ private void complete(TableName tableName, LocateRequest req, RegionLocations lo
320344 TableCache tableCache = getTableCache (tableName );
321345 if (locs != null ) {
322346 RegionLocations addedLocs = addToCache (tableCache , locs );
347+ List <RegionLocationsFutureResult > futureResultList = new ArrayList <>();
323348 synchronized (tableCache ) {
324349 tableCache .pendingRequests .remove (req );
325- tableCache .clearCompletedRequests (addedLocs );
350+ futureResultList . addAll ( tableCache .clearCompletedRequests (addedLocs ) );
326351 // Remove a complete locate request in a synchronized block, so the table cache must have
327352 // quota to send a candidate request.
328353 toSend = tableCache .getCandidate ();
329354 toSend .ifPresent (r -> tableCache .send (r ));
330355 }
356+ futureResultList .forEach (RegionLocationsFutureResult ::complete );
331357 toSend .ifPresent (r -> locateInMeta (tableName , r ));
332358 } else {
333359 // we meet an error
334360 assert error != null ;
361+ List <RegionLocationsFutureResult > futureResultList = new ArrayList <>();
335362 synchronized (tableCache ) {
336363 tableCache .pendingRequests .remove (req );
337364 // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
338365 // already retried several times
339- CompletableFuture <? > future = tableCache .allRequests .remove (req );
366+ CompletableFuture <RegionLocations > future = tableCache .allRequests .remove (req );
340367 if (future != null ) {
341- future . completeExceptionally ( error );
368+ futureResultList . add ( new RegionLocationsFutureResult ( future , null , error ) );
342369 }
343- tableCache .clearCompletedRequests (null );
370+ futureResultList . addAll ( tableCache .clearCompletedRequests (null ) );
344371 // Remove a complete locate request in a synchronized block, so the table cache must have
345372 // quota to send a candidate request.
346373 toSend = tableCache .getCandidate ();
347374 toSend .ifPresent (r -> tableCache .send (r ));
348375 }
376+ futureResultList .forEach (RegionLocationsFutureResult ::complete );
349377 toSend .ifPresent (r -> locateInMeta (tableName , r ));
350378 }
351379 }
@@ -543,9 +571,11 @@ public void onNext(Result[] results, ScanController controller) {
543571 continue ;
544572 }
545573 RegionLocations addedLocs = addToCache (tableCache , locs );
574+ List <RegionLocationsFutureResult > futureResultList = new ArrayList <>();
546575 synchronized (tableCache ) {
547- tableCache .clearCompletedRequests (addedLocs );
576+ futureResultList . addAll ( tableCache .clearCompletedRequests (addedLocs ) );
548577 }
578+ futureResultList .forEach (RegionLocationsFutureResult ::complete );
549579 }
550580 }
551581 }
@@ -677,12 +707,16 @@ void clearCache(TableName tableName) {
677707 if (tableCache == null ) {
678708 return ;
679709 }
710+ List <RegionLocationsFutureResult > futureResultList = new ArrayList <>();
680711 synchronized (tableCache ) {
681712 if (!tableCache .allRequests .isEmpty ()) {
682713 IOException error = new IOException ("Cache cleared" );
683- tableCache .allRequests .values ().forEach (f -> f .completeExceptionally (error ));
714+ tableCache .allRequests .values ().forEach (f -> {
715+ futureResultList .add (new RegionLocationsFutureResult (f , null , error ));
716+ });
684717 }
685718 }
719+ futureResultList .forEach (RegionLocationsFutureResult ::complete );
686720 conn .getConnectionMetrics ()
687721 .ifPresent (metrics -> metrics .incrMetaCacheNumClearRegion (tableCache .cache .size ()));
688722 }
0 commit comments