Skip to content

Conversation

@dianfu
Copy link
Contributor

@dianfu dianfu commented Oct 29, 2025

What is the purpose of the change

*This pull request adds retry support for async function in Python DataStream API. *

Brief change log

    • Introduces API (AsyncDataStream.unordered_wait_with_retry) for retry support*
    • Introduces RetryableResultHandler to handle retry*

Verifying this change

This change added tests and can be verified as follows:

  • Added tests test_async_with_retry

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (not documented, will be added in a separate pull request)

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 29, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@bgeng777 bgeng777 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, just some minor comments

async_retry_strategy = AsyncRetryStrategy.fixed_delay(
max_attempts=5,
backoff_time_millis=1000,
result_predicate=None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be nicer if we add a test for result_predicate


if (not self._is_timeout() and satisfy and
self._retry_strategy.can_retry(self._current_attempts)):

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra blank line

capacity, output_type)

@staticmethod
def unordered_wait_with_retry(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we prefer introducing unordered_wait_with_retry instead of adding a new parameter(AsyncRetryStrategy, default is None) to unordered_wait

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to follow the design of Java API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java's AsyncWaitOperator has only 1 constructor with asyncRetryStrategy as an parameter; so here you mean org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory#AsyncWaitOperatorFactory ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean this

public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWaitWithRetry(

output_type: TypeInformation = None) -> 'DataStream':
"""
Adds an async function with an AsyncRetryStrategy to support retry of AsyncFunction to the
data stream. The order of output stream records may be reordered.
Copy link
Contributor

@davidradl davidradl Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is because we hard code AsyncFunctionDescriptor.OutputMode.UNORDERED. Is there thinking about an ordered version of this?

wording nit: The order of output stream records may be reordered. -> The output stream records may be reordered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ordered version is to be supported in https://issues.apache.org/jira/browse/FLINK-38560 (PR is already ready and will submit for review once this PR is merged.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wording nit: The order of output stream records may be reordered. -> The output stream records may be reordered.

Oh, this description is copied from the Java API.

Copy link
Contributor

@bgeng777 bgeng777 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@dianfu dianfu closed this in 2c4a45c Oct 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants