@@ -109,10 +109,13 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
109109
110110 @ Nullable protected final RecordEvaluator <T > eofRecordEvaluator ;
111111
112- @ Nullable protected RateLimiter <SplitT > rateLimiter ;
112+ /** Indicating whether the SourceReader is currently rate limited. */
113+ private final boolean isRateLimited ;
114+
115+ private final RateLimiter <SplitT > rateLimiter ;
113116
114117 /** Future that tracks the result of acquiring permission from {@link #rateLimiter}. */
115- @ Nullable protected CompletableFuture <Void > rateLimitPermissionFuture ;
118+ @ Nullable private CompletableFuture <Void > rateLimitPermissionFuture ;
116119
117120 /**
118121 * The primary constructor for the source reader.
@@ -155,10 +158,12 @@ public SourceReaderBase(
155158 this .eofRecordEvaluator = eofRecordEvaluator ;
156159
157160 numRecordsInCounter = context .metricGroup ().getIOMetricGroup ().getNumRecordsInCounter ();
161+ isRateLimited = rateLimiterStrategy != null ;
162+ LOG .info ("Rate limiting of SourceReader is " + (isRateLimited ? "enabled" : "disabled" ));
158163 rateLimiter =
159- rateLimiterStrategy == null
160- ? null
161- : rateLimiterStrategy . createRateLimiter ( context . currentParallelism ()) ;
164+ isRateLimited
165+ ? rateLimiterStrategy . createRateLimiter ( context . currentParallelism ())
166+ : null ;
162167 }
163168
164169 @ Override
@@ -178,7 +183,7 @@ public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
178183 // we need to loop here, because we may have to go across splits
179184 while (true ) {
180185 // Check if the previous record count reached the limit of rateLimiter.
181- if (rateLimitPermissionFuture != null && !rateLimitPermissionFuture .isDone ()) {
186+ if (isRateLimited && !rateLimitPermissionFuture .isDone ()) {
182187 return trace (InputStatus .MORE_AVAILABLE );
183188 }
184189 // Process one record.
@@ -188,17 +193,19 @@ public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
188193 numRecordsInCounter .inc (1 );
189194 recordEmitter .emitRecord (record , currentSplitOutput , currentSplitContext .state );
190195 LOG .trace ("Emitted record: {}" , record );
191- if (rateLimiter != null ) {
192- RecordCountingSourceOutputWrapper <T > recordCountingSourceOutputWrapper =
193- (RecordCountingSourceOutputWrapper <T >) currentSplitOutput ;
194- if (recordCountingSourceOutputWrapper . getRecordCount () > 0 ) {
196+ if (isRateLimited ) {
197+ RateLimitingSourceOutputWrapper <T > rateLimitingSourceOutputWrapper =
198+ (RateLimitingSourceOutputWrapper <T >) currentSplitOutput ;
199+ if (rateLimitingSourceOutputWrapper . getCurrentWindowRecordCount () > 0 ) {
195200 // Acquire permit from rateLimiter.
196201 rateLimitPermissionFuture =
197202 rateLimiter
198- .acquire (recordCountingSourceOutputWrapper .getRecordCount ())
203+ .acquire (
204+ rateLimitingSourceOutputWrapper
205+ .getCurrentWindowRecordCount ())
199206 .toCompletableFuture ();
200207 }
201- recordCountingSourceOutputWrapper . resetRecordCount ();
208+ rateLimitingSourceOutputWrapper . resetWindowRecordCount ();
202209 }
203210 // We always emit MORE_AVAILABLE here, even though we do not strictly know whether
204211 // more is available. If nothing more is available, the next invocation will find
@@ -281,14 +288,13 @@ record -> {
281288 return true ;
282289 };
283290 }
284- boolean rateLimited = this .rateLimiter != null ;
285- if (rateLimited ) {
291+ if (isRateLimited ) {
286292 rateLimiter .notifyStatusChange (
287293 this .toSplitType (
288294 this .currentSplitContext .splitId , this .currentSplitContext .state ));
289295 }
290296 currentSplitOutput =
291- currentSplitContext .getOrCreateSplitOutput (output , eofRecordHandler , rateLimited );
297+ currentSplitContext .getOrCreateSplitOutput (output , eofRecordHandler , isRateLimited );
292298 LOG .trace ("Emitting records from fetch for split {}" , nextSplitId );
293299 return true ;
294300 }
@@ -311,7 +317,7 @@ public List<SplitT> snapshotState(long checkpointId) {
311317 public void notifyCheckpointComplete (long checkpointId ) throws Exception {
312318 splitStates .forEach (
313319 (id , context ) -> {
314- if (rateLimiter != null ) {
320+ if (isRateLimited ) {
315321 rateLimiter .notifyCheckpointComplete (checkpointId );
316322 }
317323 });
@@ -419,7 +425,7 @@ private SplitContext(String splitId, SplitStateT state) {
419425 SourceOutput <T > getOrCreateSplitOutput (
420426 ReaderOutput <T > mainOutput ,
421427 @ Nullable Function <T , Boolean > eofRecordHandler ,
422- boolean rateLimited ) {
428+ boolean isRateLimited ) {
423429 if (sourceOutput == null ) {
424430 // The split output should have been created when AddSplitsEvent was processed in
425431 // SourceOperator. Here we just use this method to get the previously created
@@ -428,8 +434,8 @@ SourceOutput<T> getOrCreateSplitOutput(
428434 if (eofRecordHandler != null ) {
429435 sourceOutput = new SourceOutputWrapper <>(sourceOutput , eofRecordHandler );
430436 }
431- if (rateLimited ) {
432- sourceOutput = new RecordCountingSourceOutputWrapper <>(sourceOutput );
437+ if (isRateLimited ) {
438+ sourceOutput = new RateLimitingSourceOutputWrapper <>(sourceOutput );
433439 }
434440 }
435441 return sourceOutput ;
@@ -495,29 +501,30 @@ private boolean isEndOfStreamReached(T record) {
495501 }
496502
497503 /**
498- * A wrapper around {@link SourceOutput} that counts the number of records emitted.
504+ * A wrapper around {@link SourceOutput} that counts the number of records during the current
505+ * rate-limiting window.
499506 *
500507 * <p>This wrapper is used when rate limiting is enabled to track how many records have been
501508 * emitted since the last rate limit check, allowing the reader to properly apply backpressure
502509 * when the rate limit is exceeded.
503510 *
504511 * @param <T> The type of records being emitted
505512 */
506- private static final class RecordCountingSourceOutputWrapper <T > implements SourceOutput <T > {
513+ private static final class RateLimitingSourceOutputWrapper <T > implements SourceOutput <T > {
507514 /** The underlying source output to delegate to. */
508515 final SourceOutput <T > sourceOutput ;
509516
510- /** The number of records emitted since the last reset . */
511- int recordCount ;
517+ /** Count of records handled during the current rate-limiting window . */
518+ int currentWindowRecordCount ;
512519
513520 /**
514521 * Creates a new RecordCountingSourceOutputWrapper.
515522 *
516523 * @param sourceOutput The underlying source output to wrap
517524 */
518- public RecordCountingSourceOutputWrapper (SourceOutput <T > sourceOutput ) {
525+ public RateLimitingSourceOutputWrapper (SourceOutput <T > sourceOutput ) {
519526 this .sourceOutput = sourceOutput ;
520- this .recordCount = 0 ;
527+ this .currentWindowRecordCount = 0 ;
521528 }
522529
523530 @ Override
@@ -538,27 +545,27 @@ public void markActive() {
538545 @ Override
539546 public void collect (T record ) {
540547 sourceOutput .collect (record );
541- recordCount ++;
548+ currentWindowRecordCount ++;
542549 }
543550
544551 @ Override
545552 public void collect (T record , long timestamp ) {
546553 sourceOutput .collect (record , timestamp );
547- recordCount ++;
554+ currentWindowRecordCount ++;
548555 }
549556
550557 /**
551- * Gets the number of records emitted .
558+ * Gets the currentWindowRecordCount .
552559 *
553- * @return the number of records emitted .
560+ * @return the number of currentWindowRecordCount .
554561 */
555- public int getRecordCount () {
556- return recordCount ;
562+ public int getCurrentWindowRecordCount () {
563+ return currentWindowRecordCount ;
557564 }
558565
559- /** Resets the record count to 0. */
560- public void resetRecordCount () {
561- recordCount = 0 ;
566+ /** Resets the currentWindowRecordCount to 0. */
567+ public void resetWindowRecordCount () {
568+ currentWindowRecordCount = 0 ;
562569 }
563570 }
564571}
0 commit comments