Skip to content

Commit 65c5bc4

Browse files
committed
async util & ut
async util & ut async util & ut async util javadoc async util javadoc asyncTry feat test class javadoc test Finally & currentMethod
1 parent 2ee0bf9 commit 65c5bc4

File tree

15 files changed

+1769
-0
lines changed

15 files changed

+1769
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.router.async;
19+
20+
import java.io.IOException;
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.CompletionException;
23+
import java.util.concurrent.Executor;
24+
25+
/**
26+
* Represents a function that accepts a value of type T and produces a result of type R.
27+
* This interface extends {@link Async} and provides methods to apply the function
28+
* asynchronously using {@link CompletableFuture}.
29+
*
30+
* @param <T> the type of the input to the function
31+
* @param <R> the type of the result of the function
32+
*/
33+
@FunctionalInterface
34+
public interface ApplyFunction<T, R> extends Async<R>{
35+
36+
/**
37+
* Applies this function to the given argument.
38+
*
39+
* @param t the function argument
40+
* @return the function result
41+
* @throws IOException if an I/O error occurs
42+
*/
43+
R apply(T t) throws IOException;
44+
45+
/**
46+
* Applies this function asynchronously to the result of the given {@link CompletableFuture}.
47+
* The function is executed on the same thread as the completion of the given future.
48+
*
49+
* @param in the input future
50+
* @return a new future that holds the result of the function application
51+
*/
52+
default CompletableFuture<R> apply(CompletableFuture<T> in) {
53+
return in.thenApply(t -> {
54+
try {
55+
return ApplyFunction.this.apply(t);
56+
} catch (IOException e) {
57+
throw new CompletionException(e);
58+
}
59+
});
60+
}
61+
62+
/**
63+
* Applies this function asynchronously to the result of the given {@link CompletableFuture},
64+
* using the specified executor for the asynchronous computation.
65+
*
66+
* @param in the input future
67+
* @param executor the executor to use for the asynchronous computation
68+
* @return a new future that holds the result of the function application
69+
*/
70+
default CompletableFuture<R> apply(CompletableFuture<T> in, Executor executor) {
71+
return in.thenApplyAsync(t -> {
72+
try {
73+
return ApplyFunction.this.apply(t);
74+
} catch (IOException e) {
75+
throw new CompletionException(e);
76+
}
77+
}, executor);
78+
}
79+
80+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.router.async;
19+
20+
import java.io.IOException;
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.ExecutionException;
23+
24+
/**
25+
* An interface for asynchronous operations, providing utility methods
26+
* and constants related to asynchronous computations.
27+
*
28+
* @param <R> The type of the result of the asynchronous operation
29+
*/
30+
public interface Async<R> {
31+
32+
/**
33+
* A thread-local variable to store the {@link CompletableFuture} instance for the current thread.
34+
* <p>
35+
* <b>Note:</b> After executing an asynchronous method, the thread stores the CompletableFuture
36+
* of the asynchronous method in the thread's local variable
37+
*/
38+
ThreadLocal<CompletableFuture<Object>> CUR_COMPLETABLE_FUTURE
39+
= new ThreadLocal<>();
40+
41+
/**
42+
* Sets the {@link CompletableFuture} instance for the current thread.
43+
*
44+
* @param completableFuture The {@link CompletableFuture} instance to be set
45+
* @param <T> The type of the result in the CompletableFuture
46+
*/
47+
default <T> void setCurCompletableFuture(CompletableFuture<T> completableFuture) {
48+
CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) completableFuture);
49+
}
50+
51+
/**
52+
* Gets the {@link CompletableFuture} instance for the current thread.
53+
*
54+
* @return The {@link CompletableFuture} instance for the current thread,
55+
* or {@code null} if not set
56+
*/
57+
default CompletableFuture<R> getCurCompletableFuture() {
58+
return (CompletableFuture<R>) CUR_COMPLETABLE_FUTURE.get();
59+
}
60+
61+
/**
62+
* Blocks and retrieves the result of the {@link CompletableFuture} instance
63+
* for the current thread.
64+
*
65+
* @return The result of the CompletableFuture, or {@code null} if the thread was interrupted
66+
* @throws IOException If the completion exception to the CompletableFuture
67+
* is an IOException or a subclass of it
68+
*/
69+
default R result() throws IOException {
70+
try {
71+
CompletableFuture<R> completableFuture =
72+
(CompletableFuture<R>) CUR_COMPLETABLE_FUTURE.get();
73+
assert completableFuture != null;
74+
return completableFuture.get();
75+
} catch (InterruptedException e) {
76+
return null;
77+
} catch (ExecutionException e) {
78+
Throwable cause = e.getCause();
79+
if (cause instanceof IOException) {
80+
throw (IOException)cause;
81+
}
82+
throw new IOException(e);
83+
}
84+
}
85+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.router.async;
19+
20+
import java.io.IOException;
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.CompletionException;
23+
import java.util.concurrent.Executor;
24+
25+
/**
26+
* The AsyncApplyFunction interface represents a function that
27+
* asynchronously accepts a value of type T and produces a result
28+
* of type R. This interface extends {@link ApplyFunction} and is
29+
* designed to be used with asynchronous computation frameworks,
30+
* such as Java's {@link java.util.concurrent.CompletableFuture}.
31+
*
32+
* <p>An implementation of this interface is expected to perform an
33+
* asynchronous operation and return a result, which is typically
34+
* represented as a {@code CompletableFuture<R>}. This allows for
35+
* non-blocking execution of tasks and is particularly useful for
36+
* I/O operations or any operation that may take a significant amount
37+
* of time to complete.</p>
38+
*
39+
* @param <T> the type of the input to the function
40+
* @param <R> the type of the result of the function
41+
* @see ApplyFunction
42+
* @see java.util.concurrent.CompletableFuture
43+
*/
44+
@FunctionalInterface
45+
public interface AsyncApplyFunction<T, R> extends ApplyFunction<T, R> {
46+
47+
/**
48+
* Asynchronously applies this function to the given argument.
49+
*
50+
* <p>This method is intended to initiate the function application
51+
* without waiting for the result. It is typically used when the
52+
* result of the operation is not required immediately or when the
53+
* operation is part of a larger asynchronous workflow.</p>
54+
*
55+
* @param t the function argument
56+
* @throws IOException if an I/O error occurs during the application
57+
* of the function
58+
*/
59+
void applyAsync(T t) throws IOException;
60+
61+
/**
62+
* Synchronously applies this function to the given argument and
63+
* returns the result.
64+
*
65+
* <p>This method waits for the asynchronous operation to complete
66+
* and returns its result. It is useful when the result is needed
67+
* immediately and the calling code cannot proceed without it.</p>
68+
*
69+
* @param t the function argument
70+
* @return the result of applying the function to the argument
71+
* @throws IOException if an I/O error occurs during the application
72+
* of the function
73+
*/
74+
@Override
75+
default R apply(T t) throws IOException {
76+
applyAsync(t);
77+
return result();
78+
}
79+
80+
81+
default CompletableFuture<R> async(T t) throws IOException {
82+
applyAsync(t);
83+
CompletableFuture<R> completableFuture = getCurCompletableFuture();
84+
assert completableFuture != null;
85+
return completableFuture;
86+
}
87+
88+
/**
89+
* Asynchronously applies this function to the result of the given
90+
* CompletableFuture.
91+
*
92+
* <p>This method chains the function application to the completion
93+
* of the input future. It returns a new CompletableFuture that
94+
* completes with the function's result when the input future
95+
* completes.</p>
96+
*
97+
* @param in the input future
98+
* @return a new CompletableFuture that holds the result of the
99+
* function application
100+
*/
101+
@Override
102+
default CompletableFuture<R> apply(CompletableFuture<T> in) {
103+
return in.thenCompose(t -> {
104+
try {
105+
return async(t);
106+
} catch (IOException e) {
107+
throw new CompletionException(e);
108+
}
109+
});
110+
}
111+
112+
113+
/**
114+
* Asynchronously applies this function to the result of the given
115+
* CompletableFuture, using the specified executor for the
116+
* asynchronous computation.
117+
*
118+
* <p>This method allows for more control over the execution
119+
* context of the asynchronous operation, such as running the
120+
* operation in a separate thread or thread pool.</p>
121+
*
122+
* @param in the input future
123+
* @param executor the executor to use for the asynchronous
124+
* computation
125+
* @return a new CompletableFuture that holds the result of the
126+
* function application
127+
*/
128+
@Override
129+
default CompletableFuture<R> apply(CompletableFuture<T> in, Executor executor) {
130+
return in.thenComposeAsync(t -> {
131+
try {
132+
return async(t);
133+
} catch (IOException e) {
134+
throw new CompletionException(e);
135+
}
136+
}, executor);
137+
}
138+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.router.async;
19+
20+
import java.io.IOException;
21+
import java.util.concurrent.CompletableFuture;
22+
23+
@FunctionalInterface
24+
public interface AsyncCatchFunction<R, E extends Throwable>
25+
extends CatchFunction<R, E> {
26+
27+
void applyAsync(R r, E e) throws IOException;
28+
29+
@Override
30+
default R apply(R r, E e) throws IOException {
31+
applyAsync(r, e);
32+
return result();
33+
}
34+
35+
default CompletableFuture<R> async(R r, E e) throws IOException {
36+
applyAsync(r, e);
37+
CompletableFuture<R> completableFuture = getCurCompletableFuture();
38+
assert completableFuture != null;
39+
return completableFuture;
40+
}
41+
42+
@Override
43+
default CompletableFuture<R> apply(
44+
CompletableFuture<R> in, Class<E> eClazz) {
45+
CompletableFuture<R> result = new CompletableFuture<>();
46+
in.handle((r, e) -> {
47+
if (e == null) {
48+
result.complete(r);
49+
return r;
50+
}
51+
Throwable cause = e.getCause();
52+
53+
if (eClazz.isInstance(cause)) {
54+
try {
55+
async(r, (E) cause).handle((r1, ex) -> {
56+
if (ex != null) {
57+
result.completeExceptionally(ex);
58+
} else {
59+
result.complete(r1);
60+
}
61+
return null;
62+
});
63+
} catch (IOException ioe) {
64+
result.completeExceptionally(ioe);
65+
}
66+
} else {
67+
result.completeExceptionally(cause);
68+
}
69+
return r;
70+
});
71+
return result;
72+
}
73+
}

0 commit comments

Comments
 (0)