4848import java .util .List ;
4949import java .util .ArrayList ;
5050import java .util .Map ;
51- import java .util .concurrent .ConcurrentSkipListMap ;
52- import java .util .concurrent .ConcurrentHashMap ;
5351import java .util .concurrent .ExecutorService ;
5452import java .util .concurrent .CompletableFuture ;
55- import java .util .concurrent .TimeoutException ;
5653import java .util .concurrent .CompletionException ;
5754import java .util .concurrent .ExecutionException ;
5855import java .util .concurrent .Executors ;
5956import java .util .concurrent .atomic .AtomicReference ;
60- import java .util .stream .Collectors ;
6157
6258import static org .apache .hadoop .hdds .scm .storage .ContainerProtocolCalls
6359 .putBlockAsync ;
@@ -97,7 +93,6 @@ public class BlockOutputStream extends OutputStream {
9793 private int chunkSize ;
9894 private final long streamBufferFlushSize ;
9995 private final long streamBufferMaxSize ;
100- private final long watchTimeout ;
10196 private BufferPool bufferPool ;
10297 // The IOException will be set by response handling thread in case there is an
10398 // exception received in the response. If the exception is set, the next
@@ -111,10 +106,6 @@ public class BlockOutputStream extends OutputStream {
111106 // effective data write attempted so far for the block
112107 private long writtenDataLength ;
113108
114- // total data which has been successfully flushed and acknowledged
115- // by all servers
116- private long totalAckDataLength ;
117-
118109 // List containing buffers for which the putBlock call will
119110 // update the length in the datanodes. This list will just maintain
120111 // references to the buffers in the BufferPool which will be cleared
@@ -123,17 +114,10 @@ public class BlockOutputStream extends OutputStream {
123114 // which got written between successive putBlock calls.
124115 private List <ByteBuffer > bufferList ;
125116
126- // future Map to hold up all putBlock futures
127- private ConcurrentHashMap <Long ,
128- CompletableFuture <ContainerProtos .ContainerCommandResponseProto >>
129- futureMap ;
130-
131- // The map should maintain the keys (logIndexes) in order so that while
132- // removing we always end up updating incremented data flushed length.
117+ // This object will maintain the commitIndexes and byteBufferList in order
133118 // Also, corresponding to the logIndex, the corresponding list of buffers will
134119 // be released from the buffer pool.
135- private ConcurrentSkipListMap <Long , List <ByteBuffer >>
136- commitIndex2flushedDataMap ;
120+ private final CommitWatcher commitWatcher ;
137121
138122 private List <DatanodeDetails > failedServers ;
139123
@@ -175,20 +159,17 @@ public BlockOutputStream(BlockID blockID, String key,
175159 this .chunkIndex = 0 ;
176160 this .streamBufferFlushSize = streamBufferFlushSize ;
177161 this .streamBufferMaxSize = streamBufferMaxSize ;
178- this .watchTimeout = watchTimeout ;
179162 this .bufferPool = bufferPool ;
180163 this .checksumType = checksumType ;
181164 this .bytesPerChecksum = bytesPerChecksum ;
182165
183166 // A single thread executor handle the responses of async requests
184167 responseExecutor = Executors .newSingleThreadExecutor ();
185- commitIndex2flushedDataMap = new ConcurrentSkipListMap <>();
186- totalAckDataLength = 0 ;
187- futureMap = new ConcurrentHashMap <>();
168+ commitWatcher = new CommitWatcher (bufferPool , xceiverClient , watchTimeout );
169+ bufferList = null ;
188170 totalDataFlushedLength = 0 ;
189171 writtenDataLength = 0 ;
190172 failedServers = Collections .emptyList ();
191- bufferList = null ;
192173 ioException = new AtomicReference <>(null );
193174 }
194175
@@ -198,7 +179,7 @@ public BlockID getBlockID() {
198179 }
199180
200181 public long getTotalAckDataLength () {
201- return totalAckDataLength ;
182+ return commitWatcher . getTotalAckDataLength () ;
202183 }
203184
204185 public long getWrittenDataLength () {
@@ -230,7 +211,7 @@ public IOException getIoException() {
230211
231212 @ VisibleForTesting
232213 public Map <Long , List <ByteBuffer >> getCommitIndex2flushedDataMap () {
233- return commitIndex2flushedDataMap ;
214+ return commitWatcher . getCommitIndex2flushedDataMap () ;
234215 }
235216
236217 @ Override
@@ -333,34 +314,6 @@ public void writeOnRetry(long len) throws IOException {
333314 }
334315 }
335316
336- /**
337- * just update the totalAckDataLength. In case of failure,
338- * we will read the data starting from totalAckDataLength.
339- */
340- private void updateFlushIndex (List <Long > indexes ) {
341- Preconditions .checkArgument (!commitIndex2flushedDataMap .isEmpty ());
342- for (long index : indexes ) {
343- Preconditions .checkState (commitIndex2flushedDataMap .containsKey (index ));
344- List <ByteBuffer > buffers = commitIndex2flushedDataMap .remove (index );
345- long length = buffers .stream ().mapToLong (value -> {
346- int pos = value .position ();
347- Preconditions .checkArgument (pos <= chunkSize );
348- return pos ;
349- }).sum ();
350- // totalAckDataLength replicated yet should always be incremented
351- // with the current length being returned from commitIndex2flushedDataMap.
352- totalAckDataLength += length ;
353- LOG .debug ("Total data successfully replicated: " + totalAckDataLength );
354- futureMap .remove (totalAckDataLength );
355- // Flush has been committed to required servers successful.
356- // just release the current buffer from the buffer pool corresponding
357- // to the buffers that have been committed with the putBlock call.
358- for (ByteBuffer byteBuffer : buffers ) {
359- bufferPool .releaseBuffer (byteBuffer );
360- }
361- }
362- }
363-
364317 /**
365318 * This is a blocking call. It will wait for the flush till the commit index
366319 * at the head of the commitIndex2flushedDataMap gets replicated to all or
@@ -370,69 +323,49 @@ private void updateFlushIndex(List<Long> indexes) {
370323 private void handleFullBuffer () throws IOException {
371324 try {
372325 checkOpen ();
373- if (!futureMap .isEmpty ()) {
326+ if (!commitWatcher . getFutureMap () .isEmpty ()) {
374327 waitOnFlushFutures ();
375328 }
376329 } catch (InterruptedException | ExecutionException e ) {
377330 setIoException (e );
378331 adjustBuffersOnException ();
379332 throw getIoException ();
380333 }
381- if (!commitIndex2flushedDataMap .isEmpty ()) {
382- watchForCommit (
383- commitIndex2flushedDataMap .keySet ().stream ().mapToLong (v -> v )
384- .min ().getAsLong ());
385- }
334+ watchForCommit (true );
386335 }
387336
388- private void adjustBuffers (long commitIndex ) {
389- List <Long > keyList = commitIndex2flushedDataMap .keySet ().stream ()
390- .filter (p -> p <= commitIndex ).collect (Collectors .toList ());
391- if (keyList .isEmpty ()) {
392- return ;
393- } else {
394- updateFlushIndex (keyList );
395- }
396- }
397337
398338 // It may happen that once the exception is encountered , we still might
399339 // have successfully flushed up to a certain index. Make sure the buffers
400340 // only contain data which have not been sufficiently replicated
401341 private void adjustBuffersOnException () {
402- adjustBuffers ( xceiverClient . getReplicatedMinCommitIndex () );
342+ commitWatcher . releaseBuffersOnException ( );
403343 }
404344
405345 /**
406346 * calls watchForCommit API of the Ratis Client. For Standalone client,
407347 * it is a no op.
408- * @param commitIndex log index to watch for
348+ * @param bufferFull flag indicating whether bufferFull condition is hit or
349+ * its called as part flush/close
409350 * @return minimum commit index replicated to all nodes
410351 * @throws IOException IOException in case watch gets timed out
411352 */
412- private void watchForCommit (long commitIndex ) throws IOException {
353+ private void watchForCommit (boolean bufferFull ) throws IOException {
413354 checkOpen ();
414- Preconditions .checkState (!commitIndex2flushedDataMap .isEmpty ());
415- long index ;
416355 try {
417- XceiverClientReply reply =
418- xceiverClient .watchForCommit (commitIndex , watchTimeout );
419- if (reply == null ) {
420- index = 0 ;
421- } else {
356+ XceiverClientReply reply = bufferFull ?
357+ commitWatcher .watchOnFirstIndex () : commitWatcher .watchOnLastIndex ();
358+ if (reply != null ) {
422359 List <DatanodeDetails > dnList = reply .getDatanodes ();
423360 if (!dnList .isEmpty ()) {
424361 if (failedServers .isEmpty ()) {
425362 failedServers = new ArrayList <>();
426363 }
427364 failedServers .addAll (dnList );
428365 }
429- index = reply .getLogIndex ();
430366 }
431- adjustBuffers (index );
432- } catch (TimeoutException | InterruptedException | ExecutionException e ) {
433- LOG .warn ("watchForCommit failed for index " + commitIndex , e );
434- setIoException (e );
435- adjustBuffersOnException ();
367+ } catch (IOException ioe ) {
368+ setIoException (ioe );
436369 throw getIoException ();
437370 }
438371 }
@@ -471,14 +404,14 @@ ContainerCommandResponseProto> executePutBlock()
471404 blockID = responseBlockID ;
472405 LOG .debug (
473406 "Adding index " + asyncReply .getLogIndex () + " commitMap size "
474- + commitIndex2flushedDataMap . size () + " flushLength "
407+ + commitWatcher . getCommitInfoMapSize () + " flushLength "
475408 + flushPos + " numBuffers " + byteBufferList .size ()
476409 + " blockID " + blockID + " bufferPool size" + bufferPool
477410 .getSize () + " currentBufferIndex " + bufferPool
478411 .getCurrentBufferIndex ());
479412 // for standalone protocol, logIndex will always be 0.
480- commitIndex2flushedDataMap
481- .put (asyncReply .getLogIndex (), byteBufferList );
413+ commitWatcher
414+ .updateCommitInfoMap (asyncReply .getLogIndex (), byteBufferList );
482415 }
483416 return e ;
484417 }, responseExecutor ).exceptionally (e -> {
@@ -493,7 +426,7 @@ ContainerCommandResponseProto> executePutBlock()
493426 throw new IOException (
494427 "Unexpected Storage Container Exception: " + e .toString (), e );
495428 }
496- futureMap .put (flushPos , flushFuture );
429+ commitWatcher . getFutureMap () .put (flushPos , flushFuture );
497430 return flushFuture ;
498431 }
499432
@@ -553,18 +486,7 @@ private void handleFlush()
553486 executePutBlock ();
554487 }
555488 waitOnFlushFutures ();
556- if (!commitIndex2flushedDataMap .isEmpty ()) {
557- // wait for the last commit index in the commitIndex2flushedDataMap
558- // to get committed to all or majority of nodes in case timeout
559- // happens.
560- long lastIndex =
561- commitIndex2flushedDataMap .keySet ().stream ().mapToLong (v -> v )
562- .max ().getAsLong ();
563- LOG .debug (
564- "waiting for last flush Index " + lastIndex + " to catch up" );
565- watchForCommit (lastIndex );
566- }
567-
489+ watchForCommit (false );
568490 // just check again if the exception is hit while waiting for the
569491 // futures to ensure flush has indeed succeeded
570492
@@ -594,11 +516,11 @@ public void close() throws IOException {
594516 }
595517 }
596518
597-
598519 private void waitOnFlushFutures ()
599520 throws InterruptedException , ExecutionException {
600521 CompletableFuture <Void > combinedFuture = CompletableFuture .allOf (
601- futureMap .values ().toArray (new CompletableFuture [futureMap .size ()]));
522+ commitWatcher .getFutureMap ().values ().toArray (
523+ new CompletableFuture [commitWatcher .getFutureMap ().size ()]));
602524 // wait for all the transactions to complete
603525 combinedFuture .get ();
604526 }
@@ -637,18 +559,11 @@ public void cleanup(boolean invalidateClient) {
637559 }
638560 xceiverClientManager = null ;
639561 xceiverClient = null ;
640- if (futureMap != null ) {
641- futureMap .clear ();
642- }
643- futureMap = null ;
562+ commitWatcher .cleanup ();
644563 if (bufferList != null ) {
645564 bufferList .clear ();
646565 }
647566 bufferList = null ;
648- if (commitIndex2flushedDataMap != null ) {
649- commitIndex2flushedDataMap .clear ();
650- }
651- commitIndex2flushedDataMap = null ;
652567 responseExecutor .shutdown ();
653568 }
654569
0 commit comments