5959import static org .junit .Assert .assertNotNull ;
6060import static org .junit .Assert .assertTrue ;
6161
62+ /**
63+ * Used to test the functionality of {@link RouterAsyncRpcClient}.
64+ */
6265public class TestRouterAsyncRpcClient {
6366 private static Configuration routerConf ;
6467 /** Federated HDFS cluster. */
@@ -72,8 +75,12 @@ public class TestRouterAsyncRpcClient {
7275 private RouterRpcServer routerRpcServer ;
7376 private RouterAsyncRpcClient asyncRpcClient ;
7477 private FederationRPCMetrics rpcMetrics ;
78+ private final String testFile = "/test.file" ;
7579
76-
80+ /**
81+ * Start a cluster using a router service that includes 2 namespaces,
82+ * 6 namenodes and 6 datanodes.
83+ */
7784 @ BeforeClass
7885 public static void setUpCluster () throws Exception {
7986 cluster = new MiniRouterDFSCluster (true , 2 , 3 ,
@@ -116,12 +123,15 @@ public static void setUpCluster() throws Exception {
116123 }
117124
118125 @ AfterClass
119- public static void shutdownCluster () throws Exception {
126+ public static void shutdownCluster () {
120127 if (cluster != null ) {
121128 cluster .shutdown ();
122129 }
123130 }
124131
132+ /**
133+ * Initialize the mount table, create a RouterAsyncRpcClient object, and create test file.
134+ */
125135 @ Before
126136 public void setup () throws Exception {
127137 // Create mock locations
@@ -132,13 +142,16 @@ public void setup() throws Exception {
132142 routerFs = router .getFileSystem ();
133143 routerRpcServer = router .getRouterRpcServer ();
134144 routerRpcServer .initAsyncThreadPool ();
145+
146+ // Create a RouterAsyncRpcClient object
135147 asyncRpcClient = new RouterAsyncRpcClient (
136148 routerConf , router .getRouter (), routerRpcServer .getNamenodeResolver (),
137149 routerRpcServer .getRPCMonitor (),
138150 routerRpcServer .getRouterStateIdContext ());
139151
152+ // Create a test file
140153 FSDataOutputStream fsDataOutputStream = routerFs .create (
141- new Path ("/test.file" ), true );
154+ new Path (testFile ), true );
142155 fsDataOutputStream .write (new byte [1024 ]);
143156 fsDataOutputStream .close ();
144157 }
@@ -151,13 +164,17 @@ public void down() throws IOException {
151164 asyncRpcClient .getNamenodeResolver ().updateActiveNamenode (
152165 ns0 , NetUtils .createSocketAddr (cluster
153166 .getNamenode (ns0 , NAMENODES [0 ]).getRpcAddress ()));
154- boolean delete = routerFs .delete (new Path ("/test.file" ));
167+ // Delete the test file
168+ boolean delete = routerFs .delete (new Path (testFile ));
155169 assertTrue (delete );
156170 if (routerFs != null ) {
157171 routerFs .close ();
158172 }
159173 }
160174
175+ /**
176+ * Test the functionality of the asynchronous invokeSingle method.
177+ */
161178 @ Test
162179 public void testInvokeSingle () throws Exception {
163180 long proxyOps = rpcMetrics .getProxyOps ();
@@ -173,6 +190,9 @@ public void testInvokeSingle() throws Exception {
173190 assertTrue (rpcMetrics .getProxyAvg () > 0 );
174191 }
175192
193+ /**
194+ * Test the functionality of the asynchronous invokeAll and invokeConcurrent methods.
195+ */
176196 @ Test
177197 public void testInvokeAll () throws Exception {
178198 long proxyOps = rpcMetrics .getProxyOps ();
@@ -206,6 +226,9 @@ public void testInvokeAll() throws Exception {
206226 assertTrue (rpcMetrics .getProxyAvg () > 0 );
207227 }
208228
229+ /**
230+ * Test the functionality of the asynchronous invokeMethod method.
231+ */
209232 @ Test
210233 public void testInvokeMethod () throws Exception {
211234 long proxyOps = rpcMetrics .getProxyOps ();
@@ -214,7 +237,7 @@ public void testInvokeMethod() throws Exception {
214237 new Class <?>[] {String .class }, new RemoteParam ());
215238 UserGroupInformation ugi = RouterRpcServer .getRemoteUser ();
216239 Class <?> protocol = method .getProtocol ();
217- Object [] params = new String []{"/test.file" };
240+ Object [] params = new String []{testFile };
218241 List <? extends FederationNamenodeContext > namenodes =
219242 asyncRpcClient .getOrderedNamenodes (ns0 , false );
220243 asyncRpcClient .invokeMethod (ugi , namenodes , false ,
@@ -257,10 +280,13 @@ public void testInvokeMethod() throws Exception {
257280 assertEquals (1 , rpcMetrics .getProxyOpFailureCommunicate ());
258281 }
259282
283+ /**
284+ * Test the functionality of the asynchronous invokeSequential method.
285+ */
260286 @ Test
261287 public void testInvokeSequential () throws Exception {
262288 List <RemoteLocation > locations =
263- routerRpcServer .getLocationsForPath ("/test.file" , false , false );
289+ routerRpcServer .getLocationsForPath (testFile , false , false );
264290 RemoteMethod remoteMethod = new RemoteMethod ("getBlockLocations" ,
265291 new Class <?>[] {String .class , long .class , long .class },
266292 new RemoteParam (), 0 , 1024 );
@@ -271,6 +297,9 @@ public void testInvokeSequential() throws Exception {
271297 assertEquals (1 , locatedBlocks .getLocatedBlocks ().size ());
272298 }
273299
300+ /**
301+ * Initialize the mount information.
302+ */
274303 private void installMockLocations () {
275304 List <MiniRouterDFSCluster .RouterContext > routers = cluster .getRouters ();
276305
@@ -282,4 +311,4 @@ private void installMockLocations() {
282311 resolver .addLocation ("/multDes" , ns1 , "/multDes" );
283312 }
284313 }
285- }
314+ }
0 commit comments