1- /**
1+ /*
22 * Licensed to the Apache Software Foundation (ASF) under one
33 * or more contributor license agreements. See the NOTICE file
44 * distributed with this work for additional information
2727import static org .apache .hadoop .hbase .client .ConnectionUtils .isRemote ;
2828import static org .apache .hadoop .hbase .client .ConnectionUtils .timelineConsistentRead ;
2929import static org .apache .hadoop .hbase .util .FutureUtils .addListener ;
30-
30+ import io .opentelemetry .api .trace .Span ;
31+ import io .opentelemetry .api .trace .StatusCode ;
32+ import io .opentelemetry .context .Scope ;
3133import java .io .IOException ;
3234import java .util .concurrent .CompletableFuture ;
3335import java .util .concurrent .TimeUnit ;
3436import java .util .concurrent .atomic .AtomicInteger ;
3537import org .apache .hadoop .hbase .HRegionLocation ;
3638import org .apache .hadoop .hbase .TableName ;
3739import org .apache .hadoop .hbase .client .metrics .ScanMetrics ;
40+ import org .apache .hadoop .hbase .client .trace .TableOperationSpanBuilder ;
3841import org .apache .hadoop .hbase .ipc .HBaseRpcController ;
42+ import org .apache .hadoop .hbase .trace .TraceUtil ;
3943import org .apache .yetus .audience .InterfaceAudience ;
40-
4144import org .apache .hbase .thirdparty .io .netty .util .Timer ;
42-
4345import org .apache .hadoop .hbase .shaded .protobuf .RequestConverter ;
4446import org .apache .hadoop .hbase .shaded .protobuf .generated .ClientProtos .ClientService ;
4547import org .apache .hadoop .hbase .shaded .protobuf .generated .ClientProtos .ClientService .Interface ;
@@ -85,6 +87,8 @@ class AsyncClientScanner {
8587
8688 private final ScanResultCache resultCache ;
8789
90+ private final Span span ;
91+
8892 public AsyncClientScanner (Scan scan , AdvancedScanResultConsumer consumer , TableName tableName ,
8993 AsyncConnectionImpl conn , Timer retryTimer , long pauseNs , long pauseForCQTBENs ,
9094 int maxAttempts , long scanTimeoutNs , long rpcTimeoutNs , int startLogErrorsCnt ) {
@@ -112,6 +116,21 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN
112116 } else {
113117 this .scanMetrics = null ;
114118 }
119+
120+ /*
121+ * Assumes that the `start()` method is called immediately after construction. If this is no
122+ * longer the case, for tracing correctness, we should move the start of the span into the
123+ * `start()` method. The cost of doing so would be making access to the `span` safe for
124+ * concurrent threads.
125+ */
126+ span = new TableOperationSpanBuilder (conn )
127+ .setTableName (tableName )
128+ .setOperation (scan )
129+ .build ();
130+ if (consumer instanceof AsyncTableResultScanner ) {
131+ AsyncTableResultScanner scanner = (AsyncTableResultScanner ) consumer ;
132+ scanner .setSpan (span );
133+ }
115134 }
116135
117136 private static final class OpenScannerResponse {
@@ -140,26 +159,35 @@ public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, In
140159
141160 private CompletableFuture <OpenScannerResponse > callOpenScanner (HBaseRpcController controller ,
142161 HRegionLocation loc , ClientService .Interface stub ) {
143- boolean isRegionServerRemote = isRemote (loc .getHostname ());
144- incRPCCallsMetrics (scanMetrics , isRegionServerRemote );
145- if (openScannerTries .getAndIncrement () > 1 ) {
146- incRPCRetriesMetrics (scanMetrics , isRegionServerRemote );
162+ try (Scope ignored = span .makeCurrent ()) {
163+ boolean isRegionServerRemote = isRemote (loc .getHostname ());
164+ incRPCCallsMetrics (scanMetrics , isRegionServerRemote );
165+ if (openScannerTries .getAndIncrement () > 1 ) {
166+ incRPCRetriesMetrics (scanMetrics , isRegionServerRemote );
167+ }
168+ CompletableFuture <OpenScannerResponse > future = new CompletableFuture <>();
169+ try {
170+ ScanRequest request = RequestConverter .buildScanRequest (
171+ loc .getRegion ().getRegionName (), scan , scan .getCaching (), false );
172+ stub .scan (controller , request , resp -> {
173+ try (Scope ignored1 = span .makeCurrent ()) {
174+ if (controller .failed ()) {
175+ final IOException e = controller .getFailed ();
176+ future .completeExceptionally (e );
177+ TraceUtil .setError (span , e );
178+ span .end ();
179+ return ;
180+ }
181+ future .complete (new OpenScannerResponse (
182+ loc , isRegionServerRemote , stub , controller , resp ));
183+ }
184+ });
185+ } catch (IOException e ) {
186+ // span is closed by listener attached to the Future in `openScanner()`
187+ future .completeExceptionally (e );
188+ }
189+ return future ;
147190 }
148- CompletableFuture <OpenScannerResponse > future = new CompletableFuture <>();
149- try {
150- ScanRequest request = RequestConverter .buildScanRequest (loc .getRegion ().getRegionName (), scan ,
151- scan .getCaching (), false );
152- stub .scan (controller , request , resp -> {
153- if (controller .failed ()) {
154- future .completeExceptionally (controller .getFailed ());
155- return ;
156- }
157- future .complete (new OpenScannerResponse (loc , isRegionServerRemote , stub , controller , resp ));
158- });
159- } catch (IOException e ) {
160- future .completeExceptionally (e );
161- }
162- return future ;
163191 }
164192
165193 private void startScan (OpenScannerResponse resp ) {
@@ -173,26 +201,40 @@ private void startScan(OpenScannerResponse resp) {
173201 .pauseForCQTBE (pauseForCQTBENs , TimeUnit .NANOSECONDS ).maxAttempts (maxAttempts )
174202 .startLogErrorsCnt (startLogErrorsCnt ).start (resp .controller , resp .resp ),
175203 (hasMore , error ) -> {
176- if (error != null ) {
177- consumer .onError (error );
178- return ;
179- }
180- if (hasMore ) {
181- openScanner ();
182- } else {
183- consumer .onComplete ();
204+ try (Scope ignored = span .makeCurrent ()) {
205+ if (error != null ) {
206+ try {
207+ consumer .onError (error );
208+ return ;
209+ } finally {
210+ TraceUtil .setError (span , error );
211+ span .end ();
212+ }
213+ }
214+ if (hasMore ) {
215+ openScanner ();
216+ } else {
217+ try {
218+ consumer .onComplete ();
219+ } finally {
220+ span .setStatus (StatusCode .OK );
221+ span .end ();
222+ }
223+ }
184224 }
185225 });
186226 }
187227
188228 private CompletableFuture <OpenScannerResponse > openScanner (int replicaId ) {
189- return conn .callerFactory .<OpenScannerResponse > single ().table (tableName )
190- .row (scan .getStartRow ()).replicaId (replicaId ).locateType (getLocateType (scan ))
191- .priority (scan .getPriority ())
192- .rpcTimeout (rpcTimeoutNs , TimeUnit .NANOSECONDS )
193- .operationTimeout (scanTimeoutNs , TimeUnit .NANOSECONDS ).pause (pauseNs , TimeUnit .NANOSECONDS )
194- .pauseForCQTBE (pauseForCQTBENs , TimeUnit .NANOSECONDS ).maxAttempts (maxAttempts )
195- .startLogErrorsCnt (startLogErrorsCnt ).action (this ::callOpenScanner ).call ();
229+ try (Scope ignored = span .makeCurrent ()) {
230+ return conn .callerFactory .<OpenScannerResponse > single ().table (tableName )
231+ .row (scan .getStartRow ()).replicaId (replicaId ).locateType (getLocateType (scan ))
232+ .priority (scan .getPriority ())
233+ .rpcTimeout (rpcTimeoutNs , TimeUnit .NANOSECONDS )
234+ .operationTimeout (scanTimeoutNs , TimeUnit .NANOSECONDS ).pause (pauseNs , TimeUnit .NANOSECONDS )
235+ .pauseForCQTBE (pauseForCQTBENs , TimeUnit .NANOSECONDS ).maxAttempts (maxAttempts )
236+ .startLogErrorsCnt (startLogErrorsCnt ).action (this ::callOpenScanner ).call ();
237+ }
196238 }
197239
198240 private long getPrimaryTimeoutNs () {
@@ -206,15 +248,24 @@ private void openScanner() {
206248 addListener (timelineConsistentRead (conn .getLocator (), tableName , scan , scan .getStartRow (),
207249 getLocateType (scan ), this ::openScanner , rpcTimeoutNs , getPrimaryTimeoutNs (), retryTimer ,
208250 conn .getConnectionMetrics ()), (resp , error ) -> {
209- if (error != null ) {
210- consumer .onError (error );
211- return ;
251+ try (Scope ignored = span .makeCurrent ()) {
252+ if (error != null ) {
253+ try {
254+ consumer .onError (error );
255+ return ;
256+ } finally {
257+ TraceUtil .setError (span , error );
258+ span .end ();
259+ }
260+ }
261+ startScan (resp );
212262 }
213- startScan (resp );
214263 });
215264 }
216265
217266 public void start () {
218- openScanner ();
267+ try (Scope ignored = span .makeCurrent ()) {
268+ openScanner ();
269+ }
219270 }
220271}
0 commit comments