3232
3333import java .io .IOException ;
3434import java .io .InterruptedIOException ;
35+ import java .lang .reflect .Constructor ;
3536import java .lang .reflect .InvocationTargetException ;
3637import java .lang .reflect .Method ;
3738import java .util .ArrayList ;
@@ -140,9 +141,9 @@ private FanOutOneBlockAsyncDFSOutputHelper() {
140141
141142 private interface LeaseManager {
142143
143- void begin (DFSClient client , HdfsFileStatus stat );
144+ void begin (FanOutOneBlockAsyncDFSOutput output );
144145
145- void end (DFSClient client , HdfsFileStatus stat );
146+ void end (FanOutOneBlockAsyncDFSOutput output );
146147 }
147148
148149 private static final LeaseManager LEASE_MANAGER ;
@@ -178,6 +179,16 @@ Object createObject(ClientProtocol instance, String src, FsPermission masked, St
178179 CryptoProtocolVersion [] supportedVersions ) throws Exception ;
179180 }
180181
182+ // helper class for creating the dummy DFSOutputStream
183+ private interface DummyDFSOutputStreamCreator {
184+
185+ DFSOutputStream createDummyDFSOutputStream (AsyncFSOutput output , DFSClient dfsClient ,
186+ String src , HdfsFileStatus stat , EnumSet <CreateFlag > flag , DataChecksum checksum );
187+ }
188+
189+ private static final DummyDFSOutputStreamCreator DUMMY_DFS_OUTPUT_STREAM_CREATOR =
190+ createDummyDFSOutputStreamCreator ();
191+
181192 private static final FileCreator FILE_CREATOR ;
182193
183194 // CreateFlag.SHOULD_REPLICATE is to make OutputStream on a EC directory support hflush/hsync, but
@@ -207,44 +218,28 @@ private static LeaseManager createLeaseManager3_4() throws NoSuchMethodException
207218 beginFileLeaseMethod .setAccessible (true );
208219 Method endFileLeaseMethod = DFSClient .class .getDeclaredMethod ("endFileLease" , String .class );
209220 endFileLeaseMethod .setAccessible (true );
210- Method getConfigurationMethod = DFSClient .class .getDeclaredMethod ("getConfiguration" );
211- getConfigurationMethod .setAccessible (true );
212- Method getNamespaceMehtod = HdfsFileStatus .class .getDeclaredMethod ("getNamespace" );
213-
221+ Method getUniqKeyMethod = DFSOutputStream .class .getMethod ("getUniqKey" );
214222 return new LeaseManager () {
215223
216- private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY =
217- "dfs.client.output.stream.uniq.default.key" ;
218- private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT = "DEFAULT" ;
219-
220- private String getUniqId (DFSClient client , HdfsFileStatus stat )
221- throws IllegalAccessException , IllegalArgumentException , InvocationTargetException {
222- // Copied from DFSClient in Hadoop 3.4.0
223- long fileId = stat .getFileId ();
224- String namespace = (String ) getNamespaceMehtod .invoke (stat );
225- if (namespace == null ) {
226- Configuration conf = (Configuration ) getConfigurationMethod .invoke (client );
227- String defaultKey = conf .get (DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY ,
228- DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT );
229- return defaultKey + "_" + fileId ;
230- } else {
231- return namespace + "_" + fileId ;
232- }
224+ private String getUniqKey (FanOutOneBlockAsyncDFSOutput output )
225+ throws IllegalAccessException , InvocationTargetException {
226+ return (String ) getUniqKeyMethod .invoke (output .getDummyStream ());
233227 }
234228
235229 @ Override
236- public void begin (DFSClient client , HdfsFileStatus stat ) {
230+ public void begin (FanOutOneBlockAsyncDFSOutput output ) {
237231 try {
238- beginFileLeaseMethod .invoke (client , getUniqId (client , stat ), null );
232+ beginFileLeaseMethod .invoke (output .getClient (), getUniqKey (output ),
233+ output .getDummyStream ());
239234 } catch (IllegalAccessException | InvocationTargetException e ) {
240235 throw new RuntimeException (e );
241236 }
242237 }
243238
244239 @ Override
245- public void end (DFSClient client , HdfsFileStatus stat ) {
240+ public void end (FanOutOneBlockAsyncDFSOutput output ) {
246241 try {
247- endFileLeaseMethod .invoke (client , getUniqId ( client , stat ));
242+ endFileLeaseMethod .invoke (output . getClient (), getUniqKey ( output ));
248243 } catch (IllegalAccessException | InvocationTargetException e ) {
249244 throw new RuntimeException (e );
250245 }
@@ -261,18 +256,19 @@ private static LeaseManager createLeaseManager3() throws NoSuchMethodException {
261256 return new LeaseManager () {
262257
263258 @ Override
264- public void begin (DFSClient client , HdfsFileStatus stat ) {
259+ public void begin (FanOutOneBlockAsyncDFSOutput output ) {
265260 try {
266- beginFileLeaseMethod .invoke (client , stat .getFileId (), null );
261+ beginFileLeaseMethod .invoke (output .getClient (), output .getStat ().getFileId (),
262+ output .getDummyStream ());
267263 } catch (IllegalAccessException | InvocationTargetException e ) {
268264 throw new RuntimeException (e );
269265 }
270266 }
271267
272268 @ Override
273- public void end (DFSClient client , HdfsFileStatus stat ) {
269+ public void end (FanOutOneBlockAsyncDFSOutput output ) {
274270 try {
275- endFileLeaseMethod .invoke (client , stat .getFileId ());
271+ endFileLeaseMethod .invoke (output . getClient (), output . getStat () .getFileId ());
276272 } catch (IllegalAccessException | InvocationTargetException e ) {
277273 throw new RuntimeException (e );
278274 }
@@ -341,6 +337,28 @@ private static FileCreator createFileCreator() throws NoSuchMethodException {
341337 return createFileCreator2 ();
342338 }
343339
340+ private static final String DUMMY_DFS_OUTPUT_STREAM_CLASS =
341+ "org.apache.hadoop.hdfs.DummyDFSOutputStream" ;
342+
343+ @ SuppressWarnings ("unchecked" )
344+ private static DummyDFSOutputStreamCreator createDummyDFSOutputStreamCreator () {
345+ Constructor <? extends DFSOutputStream > constructor ;
346+ try {
347+ constructor = (Constructor <? extends DFSOutputStream >) Class
348+ .forName (DUMMY_DFS_OUTPUT_STREAM_CLASS ).getConstructors ()[0 ];
349+ return (output , dfsClient , src , stat , flag , checksum ) -> {
350+ try {
351+ return constructor .newInstance (output , dfsClient , src , stat , flag , checksum );
352+ } catch (InstantiationException | IllegalAccessException | InvocationTargetException e ) {
353+ throw new RuntimeException (e );
354+ }
355+ };
356+ } catch (Exception e ) {
357+ LOG .debug ("can not find DummyDFSOutputStream, should be hadoop 2.x" , e );
358+ return (output , dfsClient , src , stat , flag , checksum ) -> null ;
359+ }
360+ }
361+
344362 private static CreateFlag loadShouldReplicateFlag () {
345363 try {
346364 return CreateFlag .valueOf ("SHOULD_REPLICATE" );
@@ -380,12 +398,12 @@ public boolean progress() {
380398 }
381399 }
382400
383- static void beginFileLease (DFSClient client , HdfsFileStatus stat ) {
384- LEASE_MANAGER .begin (client , stat );
401+ private static void beginFileLease (FanOutOneBlockAsyncDFSOutput output ) {
402+ LEASE_MANAGER .begin (output );
385403 }
386404
387- static void endFileLease (DFSClient client , HdfsFileStatus stat ) {
388- LEASE_MANAGER .end (client , stat );
405+ static void endFileLease (FanOutOneBlockAsyncDFSOutput output ) {
406+ LEASE_MANAGER .end (output );
389407 }
390408
391409 static DataChecksum createChecksum (DFSClient client ) {
@@ -599,20 +617,19 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
599617 LOG .debug ("When create output stream for {}, exclude list is {}, retry={}" , src ,
600618 getDataNodeInfo (toExcludeNodes ), retry );
601619 }
620+ EnumSetWritable <CreateFlag > createFlags = getCreateFlags (overwrite , noLocalWrite );
602621 HdfsFileStatus stat ;
603622 try {
604623 stat = FILE_CREATOR .create (namenode , src ,
605624 FsPermission .getFileDefault ().applyUMask (FsPermission .getUMask (conf )), clientName ,
606- getCreateFlags (overwrite , noLocalWrite ), createParent , replication , blockSize ,
607- CryptoProtocolVersion .supported ());
625+ createFlags , createParent , replication , blockSize , CryptoProtocolVersion .supported ());
608626 } catch (Exception e ) {
609627 if (e instanceof RemoteException ) {
610628 throw (RemoteException ) e ;
611629 } else {
612630 throw new NameNodeException (e );
613631 }
614632 }
615- beginFileLease (client , stat );
616633 boolean succ = false ;
617634 LocatedBlock locatedBlock = null ;
618635 List <Future <Channel >> futureList = null ;
@@ -637,7 +654,8 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
637654 Encryptor encryptor = createEncryptor (conf , stat , client );
638655 FanOutOneBlockAsyncDFSOutput output =
639656 new FanOutOneBlockAsyncDFSOutput (conf , dfs , client , namenode , clientName , src , stat ,
640- locatedBlock , encryptor , datanodes , summer , ALLOC , monitor );
657+ createFlags .get (), locatedBlock , encryptor , datanodes , summer , ALLOC , monitor );
658+ beginFileLease (output );
641659 succ = true ;
642660 return output ;
643661 } catch (RemoteException e ) {
@@ -676,7 +694,6 @@ public void operationComplete(Future<Channel> future) throws Exception {
676694 });
677695 }
678696 }
679- endFileLease (client , stat );
680697 }
681698 }
682699 }
@@ -713,13 +730,14 @@ public static boolean shouldRetryCreate(RemoteException e) {
713730 return e .getClassName ().endsWith ("RetryStartFileException" );
714731 }
715732
716- static void completeFile (DFSClient client , ClientProtocol namenode , String src , String clientName ,
717- ExtendedBlock block , HdfsFileStatus stat ) throws IOException {
733+ static void completeFile (FanOutOneBlockAsyncDFSOutput output , DFSClient client ,
734+ ClientProtocol namenode , String src , String clientName , ExtendedBlock block ,
735+ HdfsFileStatus stat ) throws IOException {
718736 int maxRetries = client .getConf ().getNumBlockWriteLocateFollowingRetry ();
719737 for (int retry = 0 ; retry < maxRetries ; retry ++) {
720738 try {
721739 if (namenode .complete (src , clientName , block , stat .getFileId ())) {
722- endFileLease (client , stat );
740+ endFileLease (output );
723741 return ;
724742 } else {
725743 LOG .warn ("complete file " + src + " not finished, retry = " + retry );
@@ -749,4 +767,10 @@ public static String getDataNodeInfo(Collection<DatanodeInfo> datanodeInfos) {
749767 .append (datanodeInfo .getInfoPort ()).append (")" ).toString ())
750768 .collect (Collectors .joining ("," , "[" , "]" ));
751769 }
770+
771+ static DFSOutputStream createDummyDFSOutputStream (AsyncFSOutput output , DFSClient dfsClient ,
772+ String src , HdfsFileStatus stat , EnumSet <CreateFlag > flag , DataChecksum checksum ) {
773+ return DUMMY_DFS_OUTPUT_STREAM_CREATOR .createDummyDFSOutputStream (output , dfsClient , src , stat ,
774+ flag , checksum );
775+ }
752776}
0 commit comments