2323import java .nio .ByteBuffer ;
2424import java .nio .channels .ClosedChannelException ;
2525import java .util .EnumSet ;
26+ import java .util .Random ;
2627import java .util .concurrent .atomic .AtomicReference ;
2728
2829import org .apache .hadoop .HadoopIllegalArgumentException ;
@@ -126,6 +127,8 @@ public class DFSOutputStream extends FSOutputSummer
126127 protected final AtomicReference <CachingStrategy > cachingStrategy ;
127128 private FileEncryptionInfo fileEncryptionInfo ;
128129 private int writePacketSize ;
130+ private boolean leaseRecovered = false ;
131+ private boolean exceptionInClose = false ; //for unit test
129132
130133 /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
131134 protected DFSPacket createPacket (int packetSize , int chunksPerPkt ,
@@ -836,6 +839,39 @@ protected void closeThreads(boolean force) throws IOException {
836839 }
837840 }
838841
842+ @ VisibleForTesting
843+ public void setExceptionInClose (boolean enable ) {
844+ exceptionInClose = enable ;
845+ }
846+
847+ private class EmulateExceptionInClose {
848+ private Random rand = null ;
849+ private int kickedNum ;
850+
851+ EmulateExceptionInClose (int callNum ) {
852+ if (exceptionInClose ) {
853+ rand = new Random ();
854+ }
855+ kickedNum = callNum ;
856+ }
857+
858+ void kickRandomException () throws IOException {
859+ if (exceptionInClose ) {
860+ if (kickedNum > 0 ) {
861+ if (rand .nextInt (kickedNum ) == 1 ) {
862+ throw new IOException ("Emulated random IOException in close" );
863+ }
864+ }
865+ }
866+ }
867+
868+ void kickException () throws IOException {
869+ if (exceptionInClose ) {
870+ throw new IOException ("Emulated IOException in close" );
871+ }
872+ }
873+ }
874+
839875 /**
840876 * Closes this output stream and releases any system
841877 * resources associated with this stream.
@@ -858,7 +894,20 @@ public void close() throws IOException {
858894 }
859895
860896 protected synchronized void closeImpl () throws IOException {
897+ boolean recoverOnCloseException = dfsClient .getConfiguration ().getBoolean (
898+ HdfsClientConfigKeys .Write .RECOVER_ON_CLOSE_EXCEPTION_KEY ,
899+ HdfsClientConfigKeys .Write .RECOVER_ON_CLOSE_EXCEPTION_DEFAULT );
861900 if (isClosed ()) {
901+ if (recoverOnCloseException && !leaseRecovered ) {
902+ try {
903+ dfsClient .endFileLease (fileId );
904+ dfsClient .recoverLease (src );
905+ leaseRecovered = true ;
906+ } catch (Exception e ) {
907+ LOG .warn ("Fail to recover lease for {}" , src , e );
908+ }
909+ }
910+
862911 LOG .debug ("Closing an already closed stream. [Stream:{}, streamer:{}]" ,
863912 closed , getStreamer ().streamerClosed ());
864913 try {
@@ -875,8 +924,11 @@ protected synchronized void closeImpl() throws IOException {
875924 return ;
876925 }
877926
927+ EmulateExceptionInClose eei = new EmulateExceptionInClose (5 );
878928 try {
879- flushBuffer (); // flush from all upper layers
929+ flushBuffer (); // flush from all upper layers
930+ // for test
931+ eei .kickRandomException ();
880932
881933 if (currentPacket != null ) {
882934 enqueueCurrentPacket ();
@@ -887,12 +939,28 @@ protected synchronized void closeImpl() throws IOException {
887939 }
888940
889941 try {
890- flushInternal (); // flush all data to Datanodes
942+ flushInternal (); // flush all data to Datanodes
891943 } catch (IOException ioe ) {
892944 cleanupAndRethrowIOException (ioe );
893945 }
946+ // for test
947+ eei .kickRandomException ();
894948 completeFile ();
949+ // for test
950+ eei .kickException ();
895951 } catch (ClosedChannelException ignored ) {
952+ } catch (IOException ioe ) {
953+ if (recoverOnCloseException ) {
954+ try {
955+ dfsClient .endFileLease (fileId );
956+ dfsClient .recoverLease (src );
957+ leaseRecovered = true ;
958+ } catch (Exception e ) {
959+ // Ignore exception rendered by recoverLease. Throw original
960+ // exception
961+ }
962+ }
963+ throw ioe ;
896964 } finally {
897965 // Failures may happen when flushing data.
898966 // Streamers may keep waiting for the new block information.
0 commit comments