Skip to content

Commit 9ce4e5e

Browse files
committed
add javadoc
1 parent 1f842eb commit 9ce4e5e

File tree

2 files changed

+22
-4
lines changed

2 files changed

+22
-4
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,9 @@ private Call(RPC.RpcKind rpcKind, Writable param) {
308308

309309
this.externalHandler = EXTERNAL_CALL_HANDLER.get();
310310
this.completableFuture = CALL_FUTURE_THREAD_LOCAL.get();
311-
CALL_FUTURE_THREAD_LOCAL.remove();
311+
if (completableFuture != null) {
312+
CALL_FUTURE_THREAD_LOCAL.remove();
313+
}
312314
}
313315

314316
@Override

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.concurrent.TimeUnit;
4747
import java.util.concurrent.TimeoutException;
4848

49+
import static org.apache.hadoop.ipc.TestAsyncIPC.AsyncCompletableFutureCaller.RPC_SERVER_COST_MS;
4950
import static org.junit.Assert.assertEquals;
5051
import static org.junit.Assert.assertFalse;
5152
import static org.junit.Assert.assertTrue;
@@ -147,6 +148,7 @@ void assertReturnValues(long timeout, TimeUnit unit)
147148
* implemented with CompletableFuture.
148149
*/
149150
static class AsyncCompletableFutureCaller extends Thread {
151+
public static final long RPC_SERVER_COST_MS = 100;
150152
private final Client client;
151153
private final InetSocketAddress server;
152154
private final int count;
@@ -183,7 +185,7 @@ public void run() {
183185
assertTrue(Thread.currentThread().getName().contains("connection"));
184186
try {
185187
// Since the current call has already been completed,
186-
// this method does not need to block.
188+
// this method does not need to wait.
187189
return asyncRpcResponse.get();
188190
} catch (Exception e) {
189191
throw new CompletionException(e);
@@ -196,7 +198,7 @@ public void run() {
196198
// so the time taken by the run method is less than count * 100
197199
// (where 100 is the time taken by the server to process a request).
198200
long cost = Time.monotonicNow() - startTime;
199-
assertTrue(cost < count * 100);
201+
assertTrue(cost < count * RPC_SERVER_COST_MS);
200202
LOG.info("[{}] run cost {}ms", Thread.currentThread().getName(), cost);
201203
} catch (Exception e) {
202204
fail();
@@ -615,6 +617,20 @@ public void run() {
615617
}
616618
}
617619

620+
/**
621+
* Tests the asynchronous call functionality using {@link CompletableFuture}.
622+
*
623+
* <p>The test sets up an RPC server with a specified number of handler threads,
624+
* starts the server, and sends a predefined number of asynchronous requests.
625+
* Each request is expected to take a certain amount of time to process as defined
626+
* by RPC_SERVER_COST_MS. The test verifies that the server responses are received
627+
* and match the expected values, thus validating the asynchronous call mechanism.</p>
628+
*
629+
* @throws IOException If an I/O error occurs during the test.
630+
* @throws InterruptedException If the current thread is interrupted while waiting.
631+
* @throws ExecutionException If an exception is thrown while computing the result of a
632+
* {@link CompletableFuture}.
633+
*/
618634
@Test(timeout = 60000)
619635
public void testAsyncCallWithCompletableFuture() throws IOException,
620636
InterruptedException, ExecutionException {
@@ -626,7 +642,7 @@ public void testAsyncCallWithCompletableFuture() throws IOException,
626642
server.callListener = () -> {
627643
try {
628644
// The server requires at least 100 milliseconds to process a request.
629-
Thread.sleep(100);
645+
Thread.sleep(RPC_SERVER_COST_MS);
630646
} catch (InterruptedException e) {
631647
throw new RuntimeException(e);
632648
}

0 commit comments

Comments
 (0)