Skip to content

Commit e00d40c

Browse files
committed
HBASE-26545 Implement tracing of scan
* on `AsyncTable`, both `scan` and `scanAll` methods should result in `SCAN` table operations. * the span of the `SCAN` table operation should have children representing all the RPC calls involved in servicing the scan. * when a user provides custom implementation of `AdvancedScanResultConsumer`, any spans emitted from the callback methods should also be tied to the span that represents the `SCAN` table operation. This is easily done because these callbacks are executed on the RPC thread. * when a user provides a custom implementation of `ScanResultConsumer`, any spans emitted from the callback methods should be also be tied to the span that represents the `SCAN` table operation. This accomplished by carefully passing the span instance around after it is created.
1 parent be0afbf commit e00d40c

23 files changed

+1359
-233
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java

Lines changed: 101 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
* Licensed to the Apache Software Foundation (ASF) under one
33
* or more contributor license agreements. See the NOTICE file
44
* distributed with this work for additional information
@@ -27,19 +27,21 @@
2727
import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
2828
import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
2929
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
30-
30+
import io.opentelemetry.api.trace.Span;
31+
import io.opentelemetry.api.trace.StatusCode;
32+
import io.opentelemetry.context.Scope;
3133
import java.io.IOException;
3234
import java.util.concurrent.CompletableFuture;
3335
import java.util.concurrent.TimeUnit;
3436
import java.util.concurrent.atomic.AtomicInteger;
3537
import org.apache.hadoop.hbase.HRegionLocation;
3638
import org.apache.hadoop.hbase.TableName;
3739
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
40+
import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
3841
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
42+
import org.apache.hadoop.hbase.trace.TraceUtil;
3943
import org.apache.yetus.audience.InterfaceAudience;
40-
4144
import org.apache.hbase.thirdparty.io.netty.util.Timer;
42-
4345
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
4446
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
4547
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
@@ -85,6 +87,17 @@ class AsyncClientScanner {
8587

8688
private final ScanResultCache resultCache;
8789

90+
/*
91+
* The `span` instance is accessed concurrently by several threads, but we use only basic
92+
* synchronization. The class claims to be `@ThreadSafe` so we rely on the implementation to
93+
* correctly handle concurrent use. The instance itself is initialized in the `start` method,
94+
* so we cannot make it `final`. Because the instance is created before any consuming runnable
95+
* is submitted to a worker pool, it should be enough to mark it as `volatile`. To avoid over-
96+
* paying the price of the memory barrier, where a consumer makes multiple uses of the `span`
97+
* instance, we use a local final non-volatile reference.
98+
*/
99+
private volatile Span span = null;
100+
88101
public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
89102
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs,
90103
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
@@ -140,26 +153,37 @@ public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, In
140153

141154
private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
142155
HRegionLocation loc, ClientService.Interface stub) {
143-
boolean isRegionServerRemote = isRemote(loc.getHostname());
144-
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
145-
if (openScannerTries.getAndIncrement() > 1) {
146-
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
147-
}
148-
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
149-
try {
150-
ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), scan,
151-
scan.getCaching(), false);
152-
stub.scan(controller, request, resp -> {
153-
if (controller.failed()) {
154-
future.completeExceptionally(controller.getFailed());
155-
return;
156-
}
157-
future.complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp));
158-
});
159-
} catch (IOException e) {
160-
future.completeExceptionally(e);
156+
final Span localSpan = span;
157+
try (Scope ignored = localSpan.makeCurrent()) {
158+
boolean isRegionServerRemote = isRemote(loc.getHostname());
159+
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
160+
if (openScannerTries.getAndIncrement() > 1) {
161+
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
162+
}
163+
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
164+
try {
165+
ScanRequest request = RequestConverter.buildScanRequest(
166+
loc.getRegion().getRegionName(), scan, scan.getCaching(), false);
167+
stub.scan(controller, request, resp -> {
168+
try (Scope ignored1 = localSpan.makeCurrent()) {
169+
if (controller.failed()) {
170+
final IOException e = controller.getFailed();
171+
future.completeExceptionally(e);
172+
TraceUtil.setError(localSpan, e);
173+
localSpan.end();
174+
return;
175+
}
176+
future.complete(new OpenScannerResponse(
177+
loc, isRegionServerRemote, stub, controller, resp));
178+
}
179+
});
180+
} catch (IOException e) {
181+
TraceUtil.setError(localSpan, e);
182+
localSpan.end();
183+
future.completeExceptionally(e);
184+
}
185+
return future;
161186
}
162-
return future;
163187
}
164188

165189
private void startScan(OpenScannerResponse resp) {
@@ -173,26 +197,41 @@ private void startScan(OpenScannerResponse resp) {
173197
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
174198
.startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp),
175199
(hasMore, error) -> {
176-
if (error != null) {
177-
consumer.onError(error);
178-
return;
179-
}
180-
if (hasMore) {
181-
openScanner();
182-
} else {
183-
consumer.onComplete();
200+
final Span localSpan = span;
201+
try (Scope ignored = localSpan.makeCurrent()) {
202+
if (error != null) {
203+
try {
204+
consumer.onError(error);
205+
return;
206+
} finally {
207+
TraceUtil.setError(localSpan, error);
208+
localSpan.end();
209+
}
210+
}
211+
if (hasMore) {
212+
openScanner();
213+
} else {
214+
try {
215+
consumer.onComplete();
216+
} finally {
217+
localSpan.setStatus(StatusCode.OK);
218+
localSpan.end();
219+
}
220+
}
184221
}
185222
});
186223
}
187224

188225
private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
189-
return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
190-
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
191-
.priority(scan.getPriority())
192-
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
193-
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
194-
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
195-
.startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
226+
try (Scope ignored = span.makeCurrent()) {
227+
return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
228+
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
229+
.priority(scan.getPriority())
230+
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
231+
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
232+
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
233+
.startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
234+
}
196235
}
197236

198237
private long getPrimaryTimeoutNs() {
@@ -206,15 +245,34 @@ private void openScanner() {
206245
addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
207246
getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer,
208247
conn.getConnectionMetrics()), (resp, error) -> {
209-
if (error != null) {
210-
consumer.onError(error);
211-
return;
248+
final Span localSpan = span;
249+
try (Scope ignored = localSpan.makeCurrent()) {
250+
if (error != null) {
251+
try {
252+
consumer.onError(error);
253+
return;
254+
} finally {
255+
TraceUtil.setError(localSpan, error);
256+
localSpan.end();
257+
}
258+
}
259+
startScan(resp);
212260
}
213-
startScan(resp);
214261
});
215262
}
216263

217264
public void start() {
218-
openScanner();
265+
final Span localSpan = new TableOperationSpanBuilder(conn)
266+
.setTableName(tableName)
267+
.setOperation(scan)
268+
.build();
269+
if (consumer instanceof AsyncTableResultScanner) {
270+
AsyncTableResultScanner scanner = (AsyncTableResultScanner) consumer;
271+
scanner.setSpan(localSpan);
272+
}
273+
span = localSpan;
274+
try (Scope ignored = localSpan.makeCurrent()) {
275+
openScanner();
276+
}
219277
}
220278
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
2929
import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
3030

31+
import io.opentelemetry.context.Context;
3132
import java.io.IOException;
3233
import java.util.ArrayList;
3334
import java.util.List;
@@ -573,7 +574,8 @@ private void call() {
573574
resetController(controller, callTimeoutNs, priority);
574575
ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
575576
nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit());
576-
stub.scan(controller, req, resp -> onComplete(controller, resp));
577+
Context context = Context.current();
578+
stub.scan(controller, req, resp -> context.wrap(() -> onComplete(controller, resp)).run());
577579
}
578580

579581
private void next() {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.hadoop.hbase.client;
1919

2020
import static java.util.stream.Collectors.toList;
21-
21+
import io.opentelemetry.api.trace.Span;
22+
import io.opentelemetry.context.Context;
23+
import io.opentelemetry.context.Scope;
2224
import java.io.IOException;
2325
import java.util.List;
2426
import java.util.concurrent.CompletableFuture;
@@ -32,7 +34,6 @@
3234
import org.apache.hadoop.hbase.io.TimeRange;
3335
import org.apache.hadoop.hbase.util.FutureUtils;
3436
import org.apache.yetus.audience.InterfaceAudience;
35-
3637
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
3738

3839
/**
@@ -232,22 +233,29 @@ public ResultScanner getScanner(Scan scan) {
232233
}
233234

234235
private void scan0(Scan scan, ScanResultConsumer consumer) {
235-
try (ResultScanner scanner = getScanner(scan)) {
236-
consumer.onScanMetricsCreated(scanner.getScanMetrics());
237-
for (Result result; (result = scanner.next()) != null;) {
238-
if (!consumer.onNext(result)) {
239-
break;
236+
Span span = null;
237+
try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) {
238+
span = scanner.getSpan();
239+
try (Scope ignored = span.makeCurrent()) {
240+
consumer.onScanMetricsCreated(scanner.getScanMetrics());
241+
for (Result result; (result = scanner.next()) != null; ) {
242+
if (!consumer.onNext(result)) {
243+
break;
244+
}
240245
}
246+
consumer.onComplete();
241247
}
242-
consumer.onComplete();
243248
} catch (IOException e) {
244-
consumer.onError(e);
249+
try (Scope ignored = span.makeCurrent()) {
250+
consumer.onError(e);
251+
}
245252
}
246253
}
247254

248255
@Override
249256
public void scan(Scan scan, ScanResultConsumer consumer) {
250-
pool.execute(() -> scan0(scan, consumer));
257+
final Context context = Context.current();
258+
pool.execute(context.wrap(() -> scan0(scan, consumer)));
251259
}
252260

253261
@Override

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.hadoop.hbase.client;
1919

2020
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
21-
21+
import io.opentelemetry.api.trace.Span;
2222
import java.io.IOException;
2323
import java.io.InterruptedIOException;
2424
import java.util.ArrayDeque;
@@ -58,6 +58,9 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum
5858

5959
private ScanResumer resumer;
6060

61+
// Used to pass the span instance to the `AsyncTableImpl` from its underlying `rawAsyncTable`.
62+
private Span span = null;
63+
6164
public AsyncTableResultScanner(TableName tableName, Scan scan, long maxCacheSize) {
6265
this.tableName = tableName;
6366
this.maxCacheSize = maxCacheSize;
@@ -79,6 +82,14 @@ private void stopPrefetch(ScanController controller) {
7982
resumer = controller.suspend();
8083
}
8184

85+
Span getSpan() {
86+
return span;
87+
}
88+
89+
void setSpan(final Span span) {
90+
this.span = span;
91+
}
92+
8293
@Override
8394
public synchronized void onNext(Result[] results, ScanController controller) {
8495
assert results.length > 0;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -640,30 +640,26 @@ public AsyncTableResultScanner getScanner(Scan scan) {
640640

641641
@Override
642642
public CompletableFuture<List<Result>> scanAll(Scan scan) {
643-
final Supplier<Span> supplier = newTableOperationSpanBuilder()
644-
.setOperation(scan);
645-
return tracedFuture(() -> {
646-
CompletableFuture<List<Result>> future = new CompletableFuture<>();
647-
List<Result> scanResults = new ArrayList<>();
648-
scan(scan, new AdvancedScanResultConsumer() {
643+
CompletableFuture<List<Result>> future = new CompletableFuture<>();
644+
List<Result> scanResults = new ArrayList<>();
645+
scan(scan, new AdvancedScanResultConsumer() {
649646

650-
@Override
651-
public void onNext(Result[] results, ScanController controller) {
652-
scanResults.addAll(Arrays.asList(results));
653-
}
647+
@Override
648+
public void onNext(Result[] results, ScanController controller) {
649+
scanResults.addAll(Arrays.asList(results));
650+
}
654651

655-
@Override
656-
public void onError(Throwable error) {
657-
future.completeExceptionally(error);
658-
}
652+
@Override
653+
public void onError(Throwable error) {
654+
future.completeExceptionally(error);
655+
}
659656

660-
@Override
661-
public void onComplete() {
662-
future.complete(scanResults);
663-
}
664-
});
665-
return future;
666-
}, supplier);
657+
@Override
658+
public void onComplete() {
659+
future.complete(scanResults);
660+
}
661+
});
662+
return future;
667663
}
668664

669665
@Override

0 commit comments

Comments
 (0)