3535import static org .apache .hadoop .hbase .util .ConcurrentMapUtils .computeIfAbsent ;
3636
3737import java .io .IOException ;
38+ import java .util .ArrayList ;
3839import java .util .Arrays ;
3940import java .util .HashSet ;
4041import java .util .Iterator ;
4142import java .util .LinkedHashMap ;
43+ import java .util .List ;
4244import java .util .Map ;
4345import java .util .Optional ;
4446import java .util .Set ;
@@ -122,6 +124,26 @@ public boolean equals(Object obj) {
122124 }
123125 }
124126
127+ private static final class RegionLocationsFutureResult {
128+ private final CompletableFuture <RegionLocations > future ;
129+ private final RegionLocations result ;
130+ private final Throwable e ;
131+
132+ public RegionLocationsFutureResult (CompletableFuture <RegionLocations > future ,
133+ RegionLocations result , Throwable e ) {
134+ this .future = future ;
135+ this .result = result ;
136+ this .e = e ;
137+ }
138+
139+ public void complete () {
140+ if (e != null ) {
141+ future .completeExceptionally (e );
142+ }
143+ future .complete (result );
144+ }
145+ }
146+
125147 private static final class TableCache {
126148
127149 private final ConcurrentNavigableMap <byte [], RegionLocations > cache =
@@ -148,18 +170,20 @@ public Optional<LocateRequest> getCandidate() {
148170 return allRequests .keySet ().stream ().filter (r -> !isPending (r )).findFirst ();
149171 }
150172
151- public void clearCompletedRequests (RegionLocations locations ) {
173+ public List <RegionLocationsFutureResult > clearCompletedRequests (RegionLocations locations ) {
174+ List <RegionLocationsFutureResult > futureResultList = new ArrayList <>();
152175 for (Iterator <Map .Entry <LocateRequest , CompletableFuture <RegionLocations >>> iter =
153176 allRequests .entrySet ().iterator (); iter .hasNext ();) {
154177 Map .Entry <LocateRequest , CompletableFuture <RegionLocations >> entry = iter .next ();
155- if (tryComplete (entry .getKey (), entry .getValue (), locations )) {
178+ if (tryComplete (entry .getKey (), entry .getValue (), locations , futureResultList )) {
156179 iter .remove ();
157180 }
158181 }
182+ return futureResultList ;
159183 }
160184
161185 private boolean tryComplete (LocateRequest req , CompletableFuture <RegionLocations > future ,
162- RegionLocations locations ) {
186+ RegionLocations locations , List < RegionLocationsFutureResult > futureResultList ) {
163187 if (future .isDone ()) {
164188 return true ;
165189 }
@@ -185,7 +209,7 @@ private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations
185209 completed = loc .getRegion ().containsRow (req .row );
186210 }
187211 if (completed ) {
188- future . complete ( locations );
212+ futureResultList . add ( new RegionLocationsFutureResult ( future , locations , null ) );
189213 return true ;
190214 } else {
191215 return false ;
@@ -319,32 +343,36 @@ private void complete(TableName tableName, LocateRequest req, RegionLocations lo
319343 TableCache tableCache = getTableCache (tableName );
320344 if (locs != null ) {
321345 RegionLocations addedLocs = addToCache (tableCache , locs );
346+ List <RegionLocationsFutureResult > futureResultList = new ArrayList <>();
322347 synchronized (tableCache ) {
323348 tableCache .pendingRequests .remove (req );
324- tableCache .clearCompletedRequests (addedLocs );
349+ futureResultList . addAll ( tableCache .clearCompletedRequests (addedLocs ) );
325350 // Remove a complete locate request in a synchronized block, so the table cache must have
326351 // quota to send a candidate request.
327352 toSend = tableCache .getCandidate ();
328353 toSend .ifPresent (r -> tableCache .send (r ));
329354 }
355+ futureResultList .forEach (RegionLocationsFutureResult ::complete );
330356 toSend .ifPresent (r -> locateInMeta (tableName , r ));
331357 } else {
332358 // we meet an error
333359 assert error != null ;
360+ List <RegionLocationsFutureResult > futureResultList = new ArrayList <>();
334361 synchronized (tableCache ) {
335362 tableCache .pendingRequests .remove (req );
336363 // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
337364 // already retried several times
338- CompletableFuture <? > future = tableCache .allRequests .remove (req );
365+ CompletableFuture <RegionLocations > future = tableCache .allRequests .remove (req );
339366 if (future != null ) {
340- future . completeExceptionally ( error );
367+ futureResultList . add ( new RegionLocationsFutureResult ( future , null , error ) );
341368 }
342- tableCache .clearCompletedRequests (null );
369+ futureResultList . addAll ( tableCache .clearCompletedRequests (null ) );
343370 // Remove a complete locate request in a synchronized block, so the table cache must have
344371 // quota to send a candidate request.
345372 toSend = tableCache .getCandidate ();
346373 toSend .ifPresent (r -> tableCache .send (r ));
347374 }
375+ futureResultList .forEach (RegionLocationsFutureResult ::complete );
348376 toSend .ifPresent (r -> locateInMeta (tableName , r ));
349377 }
350378 }
@@ -542,9 +570,11 @@ public void onNext(Result[] results, ScanController controller) {
542570 continue ;
543571 }
544572 RegionLocations addedLocs = addToCache (tableCache , locs );
573+ List <RegionLocationsFutureResult > futureResultList = new ArrayList <>();
545574 synchronized (tableCache ) {
546- tableCache .clearCompletedRequests (addedLocs );
575+ futureResultList . addAll ( tableCache .clearCompletedRequests (addedLocs ) );
547576 }
577+ futureResultList .forEach (RegionLocationsFutureResult ::complete );
548578 }
549579 }
550580 }
@@ -676,12 +706,16 @@ void clearCache(TableName tableName) {
676706 if (tableCache == null ) {
677707 return ;
678708 }
709+ List <RegionLocationsFutureResult > futureResultList = new ArrayList <>();
679710 synchronized (tableCache ) {
680711 if (!tableCache .allRequests .isEmpty ()) {
681712 IOException error = new IOException ("Cache cleared" );
682- tableCache .allRequests .values ().forEach (f -> f .completeExceptionally (error ));
713+ tableCache .allRequests .values ().forEach (f -> {
714+ futureResultList .add (new RegionLocationsFutureResult (f , null , error ));
715+ });
683716 }
684717 }
718+ futureResultList .forEach (RegionLocationsFutureResult ::complete );
685719 conn .getConnectionMetrics ()
686720 .ifPresent (metrics -> metrics .incrMetaCacheNumClearRegion (tableCache .cache .size ()));
687721 }
0 commit comments