Skip to content

Commit 550541d

Browse files
committed
Fix stream SAC computation and notification
Make sure to notify former active when unsubscription triggers rebalancing. References #3753
1 parent 9758753 commit 550541d

File tree

1 file changed

+14
-1
lines changed

1 file changed

+14
-1
lines changed

deps/rabbitmq_stream/src/rabbit_stream_sac_coordinator.erl

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,8 @@ handle_call({unregister_consumer,
199199
{value, Consumer} ->
200200
rabbit_log:debug("Unregistering consumer ~p from group",
201201
[Consumer]),
202+
{value, ActiveInPreviousGroupInstance} =
203+
lookup_active_consumer(Group0),
202204
G1 = remove_from_group(Consumer, Group0),
203205
rabbit_log:debug("Consumer removed from group: ~p",
204206
[G1]),
@@ -212,7 +214,14 @@ handle_call({unregister_consumer,
212214
false ->
213215
undefined
214216
end,
215-
notify_consumers(undefined, NewActive, G2),
217+
AIPGI =
218+
case ActiveInPreviousGroupInstance of
219+
Consumer ->
220+
undefined;
221+
_ ->
222+
ActiveInPreviousGroupInstance
223+
end,
224+
notify_consumers(AIPGI, NewActive, G2),
216225
G2;
217226
false ->
218227
rabbit_log:debug("Could not find consumer ~p ~p in group ~p ~p ~p",
@@ -334,6 +343,10 @@ notify_consumers(undefined,
334343
! {sac,
335344
{{subscription_id, SubscriptionId}, {active, true},
336345
{side_effects, []}}};
346+
notify_consumers(ActiveInPreviousGroupInstance, NewActive, _Group)
347+
when ActiveInPreviousGroupInstance == NewActive ->
348+
%% no changes (e.g. on unsubscription), nothing to do.
349+
ok;
337350
notify_consumers(#consumer{pid = FormerConnPid,
338351
subscription_id = FormerSubId},
339352
#consumer{pid = NewConnPid,

0 commit comments

Comments
 (0)