1+ /**
2+ * Licensed to the Apache Software Foundation (ASF) under one
3+ * or more contributor license agreements. See the NOTICE file
4+ * distributed with this work for additional information
5+ * regarding copyright ownership. The ASF licenses this file
6+ * to you under the Apache License, Version 2.0 (the
7+ * "License"); you may not use this file except in compliance
8+ * with the License. You may obtain a copy of the License at
9+ *
10+ * http://www.apache.org/licenses/LICENSE-2.0
11+ *
12+ * Unless required by applicable law or agreed to in writing, software
13+ * distributed under the License is distributed on an "AS IS" BASIS,
14+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+ * See the License for the specific language governing permissions and
16+ * limitations under the License.
17+ */
18+ package org .apache .hadoop .hdfs .server .federation .router ;
19+
20+ import org .apache .hadoop .conf .Configuration ;
21+ import org .apache .hadoop .fs .FSDataOutputStream ;
22+ import org .apache .hadoop .fs .FileSystem ;
23+ import org .apache .hadoop .fs .Path ;
24+ import org .apache .hadoop .fs .permission .FsPermission ;
25+ import org .apache .hadoop .hdfs .protocol .BlockStoragePolicy ;
26+ import org .apache .hadoop .hdfs .server .federation .MiniRouterDFSCluster ;
27+ import org .apache .hadoop .hdfs .server .federation .MockResolver ;
28+ import org .apache .hadoop .hdfs .server .federation .RouterConfigBuilder ;
29+ import org .apache .hadoop .ipc .CallerContext ;
30+ import org .junit .After ;
31+ import org .junit .AfterClass ;
32+ import org .junit .Before ;
33+ import org .junit .BeforeClass ;
34+ import org .junit .Test ;
35+ import org .mockito .Mockito ;
36+
37+ import java .io .IOException ;
38+ import java .util .concurrent .TimeUnit ;
39+
40+ import static org .apache .hadoop .hdfs .server .federation .FederationTestUtils .NAMENODES ;
41+ import static org .apache .hadoop .hdfs .server .federation .MiniRouterDFSCluster .DEFAULT_HEARTBEAT_INTERVAL_MS ;
42+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT ;
43+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT ;
44+ import static org .apache .hadoop .hdfs .server .federation .router .async .AsyncUtil .syncReturn ;
45+ import static org .junit .Assert .assertArrayEquals ;
46+ import static org .junit .Assert .assertEquals ;
47+ import static org .junit .Assert .assertNotEquals ;
48+ import static org .junit .Assert .assertTrue ;
49+
50+ public class TestRouterAsyncStoragePolicy {
51+ private static Configuration routerConf ;
52+ /** Federated HDFS cluster. */
53+ private static MiniRouterDFSCluster cluster ;
54+ private static String ns0 ;
55+
56+ /** Random Router for this federated cluster. */
57+ private MiniRouterDFSCluster .RouterContext router ;
58+ private FileSystem routerFs ;
59+ private RouterRpcServer routerRpcServer ;
60+ private RouterAsyncStoragePolicy asyncStoragePolicy ;
61+
62+ private final String testfilePath = "/testdir/testAsyncStoragePolicy.file" ;
63+
64+ @ BeforeClass
65+ public static void setUpCluster () throws Exception {
66+ cluster = new MiniRouterDFSCluster (true , 1 , 2 ,
67+ DEFAULT_HEARTBEAT_INTERVAL_MS , 1000 );
68+ cluster .setNumDatanodesPerNameservice (3 );
69+
70+ cluster .startCluster ();
71+
72+ // Making one Namenode active per nameservice
73+ if (cluster .isHighAvailability ()) {
74+ for (String ns : cluster .getNameservices ()) {
75+ cluster .switchToActive (ns , NAMENODES [0 ]);
76+ cluster .switchToStandby (ns , NAMENODES [1 ]);
77+ }
78+ }
79+ // Start routers with only an RPC service
80+ routerConf = new RouterConfigBuilder ()
81+ .rpc ()
82+ .build ();
83+
84+ // Reduce the number of RPC clients threads to overload the Router easy
85+ routerConf .setInt (RBFConfigKeys .DFS_ROUTER_CLIENT_THREADS_SIZE , 1 );
86+ routerConf .setInt (DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT , 1 );
87+ routerConf .setInt (DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT , 1 );
88+ // We decrease the DN cache times to make the test faster
89+ routerConf .setTimeDuration (
90+ RBFConfigKeys .DN_REPORT_CACHE_EXPIRE , 1 , TimeUnit .SECONDS );
91+ cluster .addRouterOverrides (routerConf );
92+ // Start routers with only an RPC service
93+ cluster .startRouters ();
94+
95+ // Register and verify all NNs with all routers
96+ cluster .registerNamenodes ();
97+ cluster .waitNamenodeRegistration ();
98+ cluster .waitActiveNamespaces ();
99+ ns0 = cluster .getNameservices ().get (0 );
100+ }
101+
102+ @ AfterClass
103+ public static void shutdownCluster () throws Exception {
104+ if (cluster != null ) {
105+ cluster .shutdown ();
106+ }
107+ }
108+
109+ @ Before
110+ public void setUp () throws IOException {
111+ router = cluster .getRandomRouter ();
112+ routerFs = router .getFileSystem ();
113+ routerRpcServer = router .getRouterRpcServer ();
114+ routerRpcServer .initAsyncThreadPool ();
115+ RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient (
116+ routerConf , router .getRouter (), routerRpcServer .getNamenodeResolver (),
117+ routerRpcServer .getRPCMonitor (),
118+ routerRpcServer .getRouterStateIdContext ());
119+ RouterRpcServer spy = Mockito .spy (routerRpcServer );
120+ Mockito .when (spy .getRPCClient ()).thenReturn (asyncRpcClient );
121+ asyncStoragePolicy = new RouterAsyncStoragePolicy (spy );
122+
123+ // Create mock locations
124+ MockResolver resolver = (MockResolver ) router .getRouter ().getSubclusterResolver ();
125+ resolver .addLocation ("/" , ns0 , "/" );
126+ FsPermission permission = new FsPermission ("705" );
127+ routerFs .mkdirs (new Path ("/testdir" ), permission );
128+ FSDataOutputStream fsDataOutputStream = routerFs .create (
129+ new Path (testfilePath ), true );
130+ fsDataOutputStream .write (new byte [1024 ]);
131+ fsDataOutputStream .close ();
132+ }
133+
134+ @ After
135+ public void tearDown () throws IOException {
136+ // clear client context
137+ CallerContext .setCurrent (null );
138+ boolean delete = routerFs .delete (new Path ("/testdir" ));
139+ assertTrue (delete );
140+ if (routerFs != null ) {
141+ routerFs .close ();
142+ }
143+ }
144+
145+ @ Test
146+ public void testRouterAsyncStoragePolicy () throws Exception {
147+ BlockStoragePolicy [] storagePolicies = cluster .getNamenodes ().get (0 )
148+ .getClient ().getStoragePolicies ();
149+ asyncStoragePolicy .getStoragePolicies ();
150+ BlockStoragePolicy [] storagePoliciesAsync = syncReturn (BlockStoragePolicy [].class );
151+ assertArrayEquals (storagePolicies , storagePoliciesAsync );
152+
153+ asyncStoragePolicy .getStoragePolicy (testfilePath );
154+ BlockStoragePolicy blockStoragePolicy1 = syncReturn (BlockStoragePolicy .class );
155+
156+ asyncStoragePolicy .setStoragePolicy (testfilePath , "COLD" );
157+ syncReturn (null );
158+ asyncStoragePolicy .getStoragePolicy (testfilePath );
159+ BlockStoragePolicy blockStoragePolicy2 = syncReturn (BlockStoragePolicy .class );
160+ assertNotEquals (blockStoragePolicy1 , blockStoragePolicy2 );
161+ assertEquals ("COLD" , blockStoragePolicy2 .getName ());
162+ }
163+ }
0 commit comments