Skip to content

Commit 3f10e98

Browse files
authored
Merge branch 'master' into HBASE-26867
2 parents 66e9807 + 0bbc8d1 commit 3f10e98

File tree

135 files changed

+5150
-2600
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

135 files changed

+5150
-2600
lines changed

dev-support/create-release/release-util.sh

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,12 +146,12 @@ function get_release_info {
146146
if [[ -z "${ASF_REPO}" ]]; then
147147
ASF_REPO="https://gitbox.apache.org/repos/asf/${PROJECT}.git"
148148
fi
149-
if [[ -z "${ASF_REPO_WEBUI}" ]]; then
150-
ASF_REPO_WEBUI="https://gitbox.apache.org/repos/asf?p=${PROJECT}.git"
151-
fi
152149
if [[ -z "${ASF_GITHUB_REPO}" ]]; then
153150
ASF_GITHUB_REPO="https://github.com/apache/${PROJECT}"
154151
fi
152+
if [[ -z "${ASF_GITHUB_WEBUI}" ]] ; then
153+
ASF_GITHUB_WEBUI="https://raw.githubusercontent.com/apache/${PROJECT}"
154+
fi
155155
if [ -z "$GIT_BRANCH" ]; then
156156
# If no branch is specified, find out the latest branch from the repo.
157157
GIT_BRANCH="$(git ls-remote --heads "$ASF_REPO" |
@@ -167,14 +167,14 @@ function get_release_info {
167167

168168
# Find the current version for the branch.
169169
local version
170-
version="$(curl -s "$ASF_REPO_WEBUI;a=blob_plain;f=pom.xml;hb=refs/heads/$GIT_BRANCH" |
170+
version="$(curl -s "$ASF_GITHUB_WEBUI/refs/heads/$GIT_BRANCH/pom.xml" |
171171
parse_version)"
172172
# We do not want to expand ${revision} here, see https://maven.apache.org/maven-ci-friendly.html
173173
# If we use ${revision} as placeholder, we need to parse the revision property to
174174
# get maven version
175175
# shellcheck disable=SC2016
176176
if [[ "${version}" == '${revision}' ]]; then
177-
version="$(curl -s "$ASF_REPO_WEBUI;a=blob_plain;f=pom.xml;hb=refs/heads/$GIT_BRANCH" |
177+
version="$(curl -s "$ASF_GITHUB_WEBUI/refs/heads/$GIT_BRANCH/pom.xml" |
178178
parse_revision)"
179179
fi
180180
log "Current branch VERSION is $version."

dev-support/git-jira-release-audit/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ blessed==1.17.0
1919
certifi==2022.12.07
2020
cffi==1.13.2
2121
chardet==3.0.4
22-
cryptography==39.0.1
22+
cryptography==41.0.0
2323
defusedxml==0.6.0
2424
enlighten==1.4.0
2525
gitdb2==2.0.6

hbase-backup/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@
4343
<type>test-jar</type>
4444
<scope>test</scope>
4545
</dependency>
46+
<dependency>
47+
<groupId>javax.ws.rs</groupId>
48+
<artifactId>javax.ws.rs-api</artifactId>
49+
<scope>test</scope>
50+
</dependency>
4651
<dependency>
4752
<groupId>org.apache.hbase</groupId>
4853
<artifactId>hbase-client</artifactId>
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client;
19+
20+
import java.util.Collections;
21+
import java.util.Map;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.function.Function;
25+
import org.apache.hadoop.hbase.ServerName;
26+
import org.apache.hadoop.hbase.util.FutureUtils;
27+
import org.apache.yetus.audience.InterfaceAudience;
28+
29+
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
30+
31+
/**
32+
* Additional Asynchronous Admin capabilities for clients.
33+
*/
34+
@InterfaceAudience.Public
35+
public final class AsyncAdminClientUtils {
36+
37+
private AsyncAdminClientUtils() {
38+
}
39+
40+
/**
41+
* Execute the given coprocessor call on all region servers.
42+
* <p>
43+
* The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a
44+
* one line lambda expression, like:
45+
*
46+
* <pre>
47+
* channel -&gt; xxxService.newStub(channel)
48+
* </pre>
49+
*
50+
* @param asyncAdmin the asynchronous administrative API for HBase.
51+
* @param stubMaker a delegation to the actual {@code newStub} call.
52+
* @param callable a delegation to the actual protobuf rpc call. See the comment of
53+
* {@link ServiceCaller} for more details.
54+
* @param <S> the type of the asynchronous stub
55+
* @param <R> the type of the return value
56+
* @return Map of each region server to its result of the protobuf rpc call, wrapped by a
57+
* {@link CompletableFuture}.
58+
* @see ServiceCaller
59+
*/
60+
public static <S, R> CompletableFuture<Map<ServerName, Object>>
61+
coprocessorServiceOnAllRegionServers(AsyncAdmin asyncAdmin, Function<RpcChannel, S> stubMaker,
62+
ServiceCaller<S, R> callable) {
63+
CompletableFuture<Map<ServerName, Object>> future = new CompletableFuture<>();
64+
FutureUtils.addListener(asyncAdmin.getRegionServers(), (regionServers, error) -> {
65+
if (error != null) {
66+
future.completeExceptionally(error);
67+
return;
68+
}
69+
Map<ServerName, Object> resultMap = new ConcurrentHashMap<>();
70+
for (ServerName regionServer : regionServers) {
71+
FutureUtils.addListener(asyncAdmin.coprocessorService(stubMaker, callable, regionServer),
72+
(server, err) -> {
73+
if (err != null) {
74+
resultMap.put(regionServer, err);
75+
} else {
76+
resultMap.put(regionServer, server);
77+
}
78+
if (resultMap.size() == regionServers.size()) {
79+
future.complete(Collections.unmodifiableMap(resultMap));
80+
}
81+
});
82+
}
83+
});
84+
return future;
85+
}
86+
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
2121

2222
import java.io.IOException;
23+
import java.util.Collections;
2324
import java.util.concurrent.CompletableFuture;
2425
import org.apache.hadoop.hbase.ServerName;
2526
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
@@ -44,7 +45,7 @@ public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl con
4445
long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
4546
long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
4647
super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts,
47-
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
48+
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, Collections.emptyMap());
4849
this.serverName = serverName;
4950
this.callable = callable;
5051
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java

Lines changed: 25 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818
package org.apache.hadoop.hbase.client;
1919

2020
import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
21-
import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
2221
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
23-
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
2422
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
2523
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
2624
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
@@ -35,6 +33,7 @@
3533
import java.util.List;
3634
import java.util.Map;
3735
import java.util.Optional;
36+
import java.util.OptionalLong;
3837
import java.util.concurrent.CompletableFuture;
3938
import java.util.concurrent.ConcurrentHashMap;
4039
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -56,6 +55,7 @@
5655
import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
5756
import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
5857
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
58+
import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
5959
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
6060
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
6161
import org.apache.hadoop.hbase.util.Bytes;
@@ -102,10 +102,6 @@ class AsyncBatchRpcRetryingCaller<T> {
102102

103103
private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors;
104104

105-
private final long pauseNs;
106-
107-
private final long pauseNsForServerOverloaded;
108-
109105
private final int maxAttempts;
110106

111107
private final long operationTimeoutNs;
@@ -116,6 +112,10 @@ class AsyncBatchRpcRetryingCaller<T> {
116112

117113
private final long startNs;
118114

115+
private final HBaseServerExceptionPauseManager pauseManager;
116+
117+
private final Map<String, byte[]> requestAttributes;
118+
119119
// we can not use HRegionLocation as the map key because the hashCode and equals method of
120120
// HRegionLocation only consider serverName.
121121
private static final class RegionRequest {
@@ -151,12 +151,11 @@ public int getPriority() {
151151

152152
public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
153153
TableName tableName, List<? extends Row> actions, long pauseNs, long pauseNsForServerOverloaded,
154-
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
154+
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt,
155+
Map<String, byte[]> requestAttributes) {
155156
this.retryTimer = retryTimer;
156157
this.conn = conn;
157158
this.tableName = tableName;
158-
this.pauseNs = pauseNs;
159-
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
160159
this.maxAttempts = maxAttempts;
161160
this.operationTimeoutNs = operationTimeoutNs;
162161
this.rpcTimeoutNs = rpcTimeoutNs;
@@ -182,6 +181,9 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
182181
}
183182
this.action2Errors = new IdentityHashMap<>();
184183
this.startNs = System.nanoTime();
184+
this.pauseManager =
185+
new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs);
186+
this.requestAttributes = requestAttributes;
185187
}
186188

187189
private static boolean hasIncrementOrAppend(Row action) {
@@ -204,10 +206,6 @@ private static boolean hasIncrementOrAppend(RowMutations mutations) {
204206
return false;
205207
}
206208

207-
private long remainingTimeNs() {
208-
return operationTimeoutNs - (System.nanoTime() - startNs);
209-
}
210-
211209
private List<ThrowableWithExtraContext> removeErrors(Action action) {
212210
synchronized (action2Errors) {
213211
return action2Errors.remove(action);
@@ -360,14 +358,14 @@ private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
360358
}
361359
});
362360
if (!failedActions.isEmpty()) {
363-
tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), false);
361+
tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), null);
364362
}
365363
}
366364

367365
private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) {
368366
long remainingNs;
369367
if (operationTimeoutNs > 0) {
370-
remainingNs = remainingTimeNs();
368+
remainingNs = pauseManager.remainingTimeNs(startNs);
371369
if (remainingNs <= 0) {
372370
failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()),
373371
tries);
@@ -398,6 +396,7 @@ private void sendToServer(ServerName serverName, ServerRequest serverReq, int tr
398396
HBaseRpcController controller = conn.rpcControllerFactory.newController();
399397
resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
400398
calcPriority(serverReq.getPriority(), tableName));
399+
controller.setRequestAttributes(requestAttributes);
401400
if (!cells.isEmpty()) {
402401
controller.setCellScanner(createCellScanner(cells));
403402
}
@@ -465,30 +464,23 @@ private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Thro
465464
List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
466465
.collect(Collectors.toList());
467466
addError(copiedActions, error, serverName);
468-
tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException,
469-
HBaseServerException.isServerOverloaded(error));
467+
tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException, error);
470468
}
471469

472470
private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
473-
boolean isServerOverloaded) {
471+
Throwable error) {
474472
if (immediately) {
475473
groupAndSend(actions, tries);
476474
return;
477475
}
478-
long delayNs;
479-
long pauseNsToUse = isServerOverloaded ? pauseNsForServerOverloaded : pauseNs;
480-
if (operationTimeoutNs > 0) {
481-
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
482-
if (maxDelayNs <= 0) {
483-
failAll(actions, tries);
484-
return;
485-
}
486-
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
487-
} else {
488-
delayNs = getPauseTime(pauseNsToUse, tries - 1);
489-
}
490476

491-
if (isServerOverloaded) {
477+
OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs);
478+
if (!maybePauseNsToUse.isPresent()) {
479+
failAll(actions, tries);
480+
return;
481+
}
482+
long delayNs = maybePauseNsToUse.getAsLong();
483+
if (HBaseServerException.isServerOverloaded(error)) {
492484
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
493485
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
494486
}
@@ -498,7 +490,7 @@ private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
498490
private void groupAndSend(Stream<Action> actions, int tries) {
499491
long locateTimeoutNs;
500492
if (operationTimeoutNs > 0) {
501-
locateTimeoutNs = remainingTimeNs();
493+
locateTimeoutNs = pauseManager.remainingTimeNs(startNs);
502494
if (locateTimeoutNs <= 0) {
503495
failAll(actions, tries);
504496
return;
@@ -529,7 +521,7 @@ private void groupAndSend(Stream<Action> actions, int tries) {
529521
sendOrDelay(actionsByServer, tries);
530522
}
531523
if (!locateFailed.isEmpty()) {
532-
tryResubmit(locateFailed.stream(), tries, false, false);
524+
tryResubmit(locateFailed.stream(), tries, false, null);
533525
}
534526
});
535527
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.opentelemetry.api.trace.StatusCode;
3333
import io.opentelemetry.context.Scope;
3434
import java.io.IOException;
35+
import java.util.Map;
3536
import java.util.concurrent.CompletableFuture;
3637
import java.util.concurrent.TimeUnit;
3738
import java.util.concurrent.atomic.AtomicInteger;
@@ -92,9 +93,12 @@ class AsyncClientScanner {
9293

9394
private final Span span;
9495

96+
private final Map<String, byte[]> requestAttributes;
97+
9598
public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
9699
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseNsForServerOverloaded,
97-
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
100+
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt,
101+
Map<String, byte[]> requestAttributes) {
98102
if (scan.getStartRow() == null) {
99103
scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
100104
}
@@ -113,6 +117,7 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN
113117
this.rpcTimeoutNs = rpcTimeoutNs;
114118
this.startLogErrorsCnt = startLogErrorsCnt;
115119
this.resultCache = createScanResultCache(scan);
120+
this.requestAttributes = requestAttributes;
116121
if (scan.isScanMetricsEnabled()) {
117122
this.scanMetrics = new ScanMetrics();
118123
consumer.onScanMetricsCreated(scanMetrics);
@@ -191,15 +196,17 @@ private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcControlle
191196
}
192197

193198
private void startScan(OpenScannerResponse resp) {
194-
addListener(conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId())
195-
.location(resp.loc).remote(resp.isRegionServerRemote)
196-
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
197-
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
198-
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
199-
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
200-
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
201-
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
202-
.start(resp.controller, resp.resp), (hasMore, error) -> {
199+
addListener(
200+
conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
201+
.remote(resp.isRegionServerRemote)
202+
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
203+
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
204+
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
205+
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
206+
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
207+
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
208+
.setRequestAttributes(requestAttributes).start(resp.controller, resp.resp),
209+
(hasMore, error) -> {
203210
try (Scope ignored = span.makeCurrent()) {
204211
if (error != null) {
205212
try {
@@ -231,8 +238,8 @@ private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
231238
.priority(scan.getPriority()).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
232239
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
233240
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
234-
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner)
235-
.call();
241+
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
242+
.setRequestAttributes(requestAttributes).action(this::callOpenScanner).call();
236243
}
237244
}
238245

0 commit comments

Comments
 (0)