Skip to content

Commit 93f0b78

Browse files
committed
Retry stream SAC unregister consumer operation
Retry unregistering a stream from its group in case of stream coordinator timeout/unavailability. The operation can fail during or after a network partition, which is normally, but it is harmless to retry it to clean up the SAC group. The operation is idempotent anyway.
1 parent df2dd96 commit 93f0b78

File tree

1 file changed

+17
-3
lines changed

1 file changed

+17
-3
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4045,9 +4045,10 @@ sac_register_consumer(VH, St, PartitionIndex, Name, Pid, ConnName, SubId) ->
40454045
end).
40464046

40474047
sac_unregister_consumer(VH, St, Name, Pid, SubId) ->
4048-
sac_call(fun() ->
4049-
?SAC_MOD:unregister_consumer(VH, St, Name, Pid, SubId)
4050-
end).
4048+
Call = fun() ->
4049+
?SAC_MOD:unregister_consumer(VH, St, Name, Pid, SubId)
4050+
end,
4051+
sac_call(retryable_sac_call(Call)).
40514052

40524053
sac_call(Call) ->
40534054
case Call() of
@@ -4063,3 +4064,16 @@ sac_call(Call) ->
40634064
R ->
40644065
R
40654066
end.
4067+
4068+
retryable_sac_call(Call) ->
4069+
fun() -> retry_sac_call(Call, 3) end.
4070+
4071+
retry_sac_call(_Call, 0) ->
4072+
{error, coordinator_unavailable};
4073+
retry_sac_call(Call, N) ->
4074+
case Call() of
4075+
{error, coordinator_unavailable} ->
4076+
retry_sac_call(Call, N - 1);
4077+
R ->
4078+
R
4079+
end.

0 commit comments

Comments
 (0)