- 
                Notifications
    
You must be signed in to change notification settings  - Fork 13.7k
 
[FLINK-38561][python] Add retry support for async function in Python DataStream API #27164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, just some minor comments
| async_retry_strategy = AsyncRetryStrategy.fixed_delay( | ||
| max_attempts=5, | ||
| backoff_time_millis=1000, | ||
| result_predicate=None, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be nicer if we add a test for result_predicate
| 
               | 
          ||
| if (not self._is_timeout() and satisfy and | ||
| self._retry_strategy.can_retry(self._current_attempts)): | ||
| 
               | 
          
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra blank line
| capacity, output_type) | ||
| 
               | 
          ||
| @staticmethod | ||
| def unordered_wait_with_retry( | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we prefer introducing unordered_wait_with_retry instead of adding a new parameter(AsyncRetryStrategy, default is None) to unordered_wait
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to follow the design of Java API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java's AsyncWaitOperator has only 1 constructor with asyncRetryStrategy as an parameter; so here you mean org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory#AsyncWaitOperatorFactory ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean this
Line 217 in 5b61b1c
| public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWaitWithRetry( | 
| output_type: TypeInformation = None) -> 'DataStream': | ||
| """ | ||
| Adds an async function with an AsyncRetryStrategy to support retry of AsyncFunction to the | ||
| data stream. The order of output stream records may be reordered. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this is because we hard code AsyncFunctionDescriptor.OutputMode.UNORDERED. Is there thinking about an ordered version of this?
wording nit: The order of output stream records may be reordered. -> The output stream records may be reordered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ordered version is to be supported in https://issues.apache.org/jira/browse/FLINK-38560 (PR is already ready and will submit for review once this PR is merged.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wording nit: The order of output stream records may be reordered. -> The output stream records may be reordered.
Oh, this description is copied from the Java API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
What is the purpose of the change
*This pull request adds retry support for async function in Python DataStream API. *
Brief change log
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation