- 
                Notifications
    You must be signed in to change notification settings 
- Fork 13.8k
[FLINK-38497][connector] FLIP-535:Introduce RateLimiter to Source. #27134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
        
          
                .../src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
      | Hi @leonardBang @ruanhang1993, maybe you can help to review this. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @lvyanquan for the contribution, I left some comments. And you need to open a task to add documentation for new feature
        
          
                ...ava/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                ...ava/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                ...ector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                ...ector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                ...ector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
      | @Override | ||
| public CompletionStage<Void> acquire() { | ||
| CompletionStage<Void> stage = rateLimiter.acquire(); | ||
| public CompletionStage<Void> acquire(int requestSize) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we say requestSize - does this mean the size of requests? Looking at the code it seems to relate to the number of futures we want to acquire, if so I would suggest renaming this. Maybe numberOfFuturesToAcquire.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The returned Future here represents a single asynchronous execution result, and there is no scenario where multiple Future instances are returned. Therefore, the variable name numberOfFutures is not appropriate.
The input parameter here represents the number of requests, which could specifically refer to the number (or even size) of data records. Since this interface is not exclusively called by SourceReader, I used the more generic term "Request" to abstract the concept beyond data-specific contexts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok thanks for the clarification. numberOfRequests seems better to me as it is not todo the with the size of individual requests. It is up to you if you want to change or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to use numberOfRequests @lvyanquan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree to use numberOf instead of size.
Note that the original comment for the acquire() method states:
flink/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java
Line 33 in 7b0a882
| * Returns a future that is completed once another event would not exceed the rate limit. For | 
Using
event is more appropriate than request in this context, hence the term numberOfEvents is used now.
            
          
                ...r-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
          
            Show resolved
            Hide resolved
        
      | Thanks for suggestion of @davidradl and @leonardBang, updated. | 
| @flinkbot run azure | 
| Subtask for document: https://issues.apache.org/jira/browse/FLINK-38589. | 
| @flinkbot run azure | 
5312e42    to
    4d22a10      
    Compare
  
    | Rebase master to fix CI failure of StreamExecutionEnvironmentTests::test_set_requirements_with_cached_directory. | 
        
          
                ...ector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
      1746af7    to
    247ae2b      
    Compare
  
    | Updated based on comments. | 
| @flinkbot run azure | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @lvyanquan for the update, generally looks good to me, merging...
What is the purpose of the change
Provide a configuration solution for fine-grained read speed control in the Flink Source connector.
Brief change log
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes)Documentation