- 
                Notifications
    You must be signed in to change notification settings 
- Fork 949
Parallel split for multipart GetObject File Download #6425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
          
     Open
      
      
            L-Applin
  wants to merge
  58
  commits into
  feature/master/large-object-dl
  
    
      
        
          
  
    
      Choose a base branch
      
     
    
      
        
      
      
        
          
          
        
        
          
            
              
              
              
  
           
        
        
          
            
              
              
           
        
       
     
  
        
          
            
          
            
          
        
       
    
      
from
olapplin/large-object-merge
  
      
      
   
  
    
  
  
  
 
  
      
    base: feature/master/large-object-dl
Could not load branches
            
              
  
    Branch not found: {{ refName }}
  
            
                
      Loading
              
            Could not load tags
            
            
              Nothing to show
            
              
  
            
                
      Loading
              
            Are you sure you want to change the base?
            Some commits from the old base branch may be removed from the timeline,
            and old review comments may become outdated.
          
          
  
     Open
                    Changes from all commits
      Commits
    
    
            Show all changes
          
          
            58 commits
          
        
        Select commit
          Hold shift + click to select a range
      
      c5ed15e
              
                exploration
              
              
                L-Applin 067db30
              
                hide cloudwatch logs for s3 regression tests
              
              
                L-Applin 41be0a6
              
                Merge remote-tracking branch 'origin/master'
              
              
                L-Applin 51357ee
              
                Multipart dowload implementation
              
              
                L-Applin a9d25ec
              
                Multipart dowload implementation
              
              
                L-Applin b70ab8b
              
                Merge remote-tracking branch 'origin/master'
              
              
                L-Applin 0a8f6c5
              
                Merge remote-tracking branch 'origin/master'
              
              
                L-Applin 7c82e2b
              
                Update non-linear download logic
              
              
                L-Applin 978f553
              
                request max-in-flight at first request
              
              
                L-Applin 3e29b8c
              
                remove stateful calls to FileAsyncResponseTransformer by creating it …
              
              
                L-Applin c44a520
              
                Wiremock test for s3AsyncClient with AsyncResponseTransformer.toFile
              
              
                L-Applin 418dcbd
              
                added 10_000 parts tests
              
              
                L-Applin db7d301
              
                add missing assertions to errorOnMiddlePart_retryable
              
              
                L-Applin 2f65a8c
              
                handle fileWriteOption
              
              
                L-Applin 0bfbdb2
              
                prevent over-requesting and fix potential race condition in FileSubsc…
              
              
                L-Applin 0a20a9c
              
                TM integration
              
              
                L-Applin e1aebb5
              
                fix tests
              
              
                L-Applin 3a0a0b1
              
                debugging for long stop before completing future
              
              
                L-Applin 2f3a14e
              
                added outstanding demand tracking, assert with checksum in integ test
              
              
                L-Applin e49aadc
              
                Merge remote-tracking branch 'origin/master'
              
              
                L-Applin aa0caf1
              
                Fix pendingTransformer polling
              
              
                L-Applin 29f899d
              
                Make download method of TM work properly with progressUpdater
              
              
                L-Applin 0aa1529
              
                remove logs
              
              
                L-Applin 426d82d
              
                add ParallelConfiguration for maxInFlightParts
              
              
                L-Applin 5bdc142
              
                javadoc and logs
              
              
                L-Applin 9551671
              
                Merge remote-tracking branch 'origin/master'
              
              
                L-Applin 7e19d14
              
                Merge remote-tracking branch 'origin/master' into feature/master/hagr…
              
              
                L-Applin d37056d
              
                Merge branch 'master' into feature/master/hagrid-multi
              
              
                L-Applin 474111b
              
                cleanup and changelog
              
              
                L-Applin 9ad9c56
              
                cleanup
              
              
                L-Applin 861d64d
              
                fix test utils after merge
              
              
                L-Applin c684d4c
              
                remove comment code in test
              
              
                L-Applin e34dcc3
              
                checkstyle
              
              
                L-Applin 8506c43
              
                rename initialPosition to position
              
              
                L-Applin f3d3773
              
                use firstMatchingHeader() instead of headers() in FileAsyncResponseTr…
              
              
                L-Applin e1360f7
              
                add handle error method
              
              
                L-Applin d1db3c8
              
                fix FileAsyncResponseTransformerPublisherTest wrong content-range met…
              
              
                L-Applin 50631f5
              
                use succeedsWithin assertion for future
              
              
                L-Applin 8f36d18
              
                fix checkstyle violations
              
              
                L-Applin a1f8d23
              
                fix checkstyle violations
              
              
                L-Applin 1716e46
              
                PR comments - first pass
              
              
                L-Applin 289554e
              
                PR comments - keep transformerCount in FileAsyncResponseTransformerPu…
              
              
                L-Applin e025fb3
              
                PR comments - keep transformerCount in FileAsyncResponseTransformerPu…
              
              
                L-Applin fe7985f
              
                rename supportsNonSerial to parallelSplitSupported, and other PR comm…
              
              
                L-Applin af94ef1
              
                Added file config options to FileAsyncResponseTransformerPublisherTest
              
              
                L-Applin 9902ec8
              
                PR comments
              
              
                L-Applin e6c824c
              
                checkstyle
              
              
                L-Applin 13098d3
              
                checkstyle
              
              
                L-Applin cfb68c0
              
                fix file issues in NonLinearMultipartDownloaderSubscriberWiremockTest.
              
              
                L-Applin 265c50c
              
                try to fix Jimfs tests again
              
              
                L-Applin d9588d8
              
                - NonLinearMultipartDownloaderSubscriber refactor based on zoewang pr…
              
              
                L-Applin cab2c1f
              
                Merge branch 'olapplin/changes-to-non-linear-subscriber' into olappli…
              
              
                L-Applin 039c4ce
              
                final ParallelMultipartDownloaderSubscriber fields
              
              
                L-Applin d3054bd
              
                mark ContentRangeParser as protected API
              
              
                L-Applin 732e337
              
                checkstyle
              
              
                L-Applin 1466a6e
              
                use debug logs
              
              
                L-Applin ea2f6aa
              
                use debug logs
              
              
                L-Applin f2eae2e
              
                fix S3MultipartClientGetObjectWiremockTest
              
              
                L-Applin File filter
Filter by extension
Conversations
          Failed to load comments.   
        
        
          
      Loading
        
  Jump to
        
          Jump to file
        
      
      
          Failed to load files.   
        
        
          
      Loading
        
  Diff view
Diff view
There are no files selected for viewing
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              | Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| { | ||
| "type": "feature", | ||
| "category": "S3", | ||
| "contributor": "", | ||
| "description": "Add support for parallel download for individual part-get for multipart GetObject in s3 async client and Transfer Manager" | ||
| } | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              
        
          
  
    
      
          
            144 changes: 144 additions & 0 deletions
          
          144 
        
  ...k-core/src/main/java/software/amazon/awssdk/core/internal/async/EmittingSubscription.java
  
  
      
      
   
        
      
      
    
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              | Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,144 @@ | ||
| /* | ||
| * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"). | ||
| * You may not use this file except in compliance with the License. | ||
| * A copy of the License is located at | ||
| * | ||
| * http://aws.amazon.com/apache2.0 | ||
| * | ||
| * or in the "license" file accompanying this file. This file is distributed | ||
| * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
| * express or implied. See the License for the specific language governing | ||
| * permissions and limitations under the License. | ||
| */ | ||
|  | ||
| package software.amazon.awssdk.core.internal.async; | ||
|  | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import java.util.function.Supplier; | ||
| import org.reactivestreams.Subscriber; | ||
| import org.reactivestreams.Subscription; | ||
| import software.amazon.awssdk.annotations.SdkInternalApi; | ||
| import software.amazon.awssdk.annotations.ThreadSafe; | ||
| import software.amazon.awssdk.utils.Logger; | ||
|  | ||
| /** | ||
| * Subscription which can emit {@link Subscriber#onNext(T)} signals to a subscriber, based on the demand received with the | ||
| * {@link Subscription#request(long)}. It tracks the outstandingDemand that has not yet been fulfilled and used a Supplier | ||
| * passed to it to create the object it needs to emit. | ||
| * @param <T> the type of object to emit to the subscriber. | ||
| */ | ||
| @SdkInternalApi | ||
| @ThreadSafe | ||
| public final class EmittingSubscription<T> implements Subscription { | ||
| private static final Logger log = Logger.loggerFor(EmittingSubscription.class); | ||
|  | ||
| private Subscriber<? super T> downstreamSubscriber; | ||
| private final AtomicBoolean emitting; | ||
| private final AtomicLong outstandingDemand; | ||
| private final Runnable onCancel; | ||
| private final AtomicBoolean isCancelled; | ||
| private final Supplier<T> supplier; | ||
|  | ||
| private EmittingSubscription(Builder<T> builder) { | ||
| this.downstreamSubscriber = builder.downstreamSubscriber; | ||
| this.onCancel = builder.onCancel; | ||
| this.supplier = builder.supplier; | ||
| this.isCancelled = new AtomicBoolean(); | ||
| this.outstandingDemand = new AtomicLong(0); | ||
| this.emitting = new AtomicBoolean(); | ||
| } | ||
|  | ||
| public static <T> Builder<T> builder() { | ||
| return new Builder<>(); | ||
| } | ||
|  | ||
| @Override | ||
| public void request(long n) { | ||
| if (n <= 0) { | ||
| downstreamSubscriber.onError(new IllegalArgumentException("Amount requested must be positive")); | ||
| return; | ||
| } | ||
| long newDemand = outstandingDemand.updateAndGet(current -> { | ||
| if (Long.MAX_VALUE - current < n) { | ||
| return Long.MAX_VALUE; | ||
| } | ||
| return current + n; | ||
| }); | ||
| log.trace(() -> String.format("new outstanding demand: %s", newDemand)); | ||
| emit(); | ||
| } | ||
|  | ||
| @Override | ||
| public void cancel() { | ||
| isCancelled.set(true); | ||
| downstreamSubscriber = null; | ||
| onCancel.run(); | ||
| } | ||
|  | ||
| private void emit() { | ||
| do { | ||
| if (!emitting.compareAndSet(false, true)) { | ||
| return; | ||
| } | ||
| try { | ||
| if (doEmit()) { | ||
| return; | ||
| } | ||
| } finally { | ||
| emitting.compareAndSet(true, false); | ||
| } | ||
| } while (outstandingDemand.get() > 0); | ||
| } | ||
|  | ||
| private boolean doEmit() { | ||
| long demand = outstandingDemand.get(); | ||
|  | ||
| while (demand > 0) { | ||
| if (isCancelled.get()) { | ||
| return true; | ||
| } | ||
| if (outstandingDemand.get() > 0) { | ||
| demand = outstandingDemand.decrementAndGet(); | ||
| T value; | ||
| try { | ||
| value = supplier.get(); | ||
| } catch (Exception e) { | ||
| downstreamSubscriber.onError(e); | ||
| return true; | ||
| } | ||
| downstreamSubscriber.onNext(value); | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|  | ||
| public static class Builder<T> { | ||
| private Subscriber<? super T> downstreamSubscriber; | ||
| private Runnable onCancel; | ||
| private Supplier<T> supplier; | ||
|  | ||
| public Builder<T> downstreamSubscriber(Subscriber<? super T> subscriber) { | ||
| this.downstreamSubscriber = subscriber; | ||
| return this; | ||
| } | ||
|  | ||
| public Builder<T> onCancel(Runnable onCancel) { | ||
| this.onCancel = onCancel; | ||
| return this; | ||
| } | ||
|  | ||
| public Builder<T> supplier(Supplier<T> supplier) { | ||
| this.supplier = supplier; | ||
| return this; | ||
| } | ||
|  | ||
| public EmittingSubscription<T> build() { | ||
| return new EmittingSubscription<>(this); | ||
| } | ||
| } | ||
|  | ||
|  | ||
| } | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              
      
      Oops, something went wrong.
        
    
  
      
      Oops, something went wrong.
        
    
  
  Add this suggestion to a batch that can be applied as a single commit.
  This suggestion is invalid because no changes were made to the code.
  Suggestions cannot be applied while the pull request is closed.
  Suggestions cannot be applied while viewing a subset of changes.
  Only one suggestion per line can be applied in a batch.
  Add this suggestion to a batch that can be applied as a single commit.
  Applying suggestions on deleted lines is not supported.
  You must change the existing code in this line in order to create a valid suggestion.
  Outdated suggestions cannot be applied.
  This suggestion has been applied or marked resolved.
  Suggestions cannot be applied from pending reviews.
  Suggestions cannot be applied on multi-line comments.
  Suggestions cannot be applied while the pull request is queued to merge.
  Suggestion cannot be applied right now. Please check back later.
  
    
  
    
Uh oh!
There was an error while loading. Please reload this page.