Skip to content

Commit fe41c15

Browse files
committed
Refactor SAC coordinator for SAC in partition context
References #3753
1 parent 88c6347 commit fe41c15

File tree

1 file changed

+87
-60
lines changed

1 file changed

+87
-60
lines changed

deps/rabbitmq_stream/src/rabbit_stream_sac_coordinator.erl

Lines changed: 87 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -145,25 +145,30 @@ handle_call({register_consumer,
145145
StreamGroups0),
146146
Group0 =
147147
lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups1),
148-
Consumer =
148+
FormerActive =
149+
case lookup_active_consumer(Group0) of
150+
{value, FA} ->
151+
FA;
152+
false ->
153+
undefined
154+
end,
155+
Consumer0 =
149156
#consumer{pid = ConnectionPid,
150157
subscription_id = SubscriptionId,
151158
active = false},
152-
Group1 = add_to_group(Consumer, Group0),
153-
Active = compute_active_flag(Consumer, Group1),
154-
#group{consumers = Consumers0} = Group1,
155-
Consumers1 = update_active_flag(Consumer, Active, Consumers0),
159+
Group1 = add_to_group(Consumer0, Group0),
160+
Group2 = compute_active_consumer(Group1),
156161
StreamGroups2 =
157162
update_groups(VirtualHost,
158163
Stream,
159164
ConsumerName,
160-
Group1#group{consumers = Consumers1},
165+
Group2,
161166
StreamGroups1),
162-
ConnectionPid
163-
! {sac,
164-
{{subscription_id, SubscriptionId}, {active, Active},
165-
{side_effects, []}}},
166167

168+
{value, Consumer1} =
169+
lookup_consumer(ConnectionPid, SubscriptionId, Group2),
170+
notify_consumers(FormerActive, Consumer1, Group2),
171+
#consumer{active = Active} = Consumer1,
167172
{reply, {ok, Active}, State#state{groups = StreamGroups2}};
168173
handle_call({unregister_consumer,
169174
VirtualHost,
@@ -177,62 +182,36 @@ handle_call({unregister_consumer,
177182
error ->
178183
State0;
179184
Group0 ->
180-
#group{consumers = Consumers0} = Group0,
181-
Consumers1 =
182-
case lists:search(fun(#consumer{pid = ConnPid,
183-
subscription_id = SubId}) ->
184-
ConnPid == ConnectionPid
185-
andalso SubId == SubscriptionId
186-
end,
187-
Consumers0)
185+
Group1 =
186+
case lookup_consumer(ConnectionPid, SubscriptionId, Group0)
188187
of
189188
{value, Consumer} ->
190189
rabbit_log:debug("Unregistering consumer ~p from group",
191190
[Consumer]),
192-
case Consumer of
193-
#consumer{active = true} ->
194-
rabbit_log:debug("Unregistering the active consumer"),
195-
%% this is active one, remove it and notify the new active one if group not empty
196-
Cs = lists:delete(Consumer, Consumers0),
197-
case Cs of
198-
[] ->
199-
%% group is empty now
200-
rabbit_log:debug("Group is now empty"),
201-
Cs;
202-
_ ->
203-
%% get new active one (the first) and notify it
204-
NewActive = lists:nth(1, Cs),
205-
#consumer{pid = Pid,
206-
subscription_id = SubId} =
207-
NewActive,
208-
rabbit_log:debug("New active consumer is ~p ~p",
209-
[Pid, SubId]),
210-
Pid
211-
! {sac,
212-
{{subscription_id, SubId},
213-
{active, true},
214-
{side_effects, []}}},
215-
update_active_flag(NewActive, true,
216-
Cs)
217-
end;
218-
_ActiveConsumer ->
219-
rabbit_log:debug("Not the active consumer, just removing it from "
220-
"the group"),
221-
lists:delete(Consumer, Consumers0)
222-
end;
223-
error ->
191+
G1 = remove_from_group(Consumer, Group0),
192+
G2 = compute_active_consumer(G1),
193+
NewActive =
194+
case lookup_active_consumer(G2) of
195+
{value, AC} ->
196+
AC;
197+
false ->
198+
undefined
199+
end,
200+
notify_consumers(Consumer, NewActive, G2),
201+
G2;
202+
false ->
224203
rabbit_log:debug("Could not find consumer ~p ~p in group ~p ~p ~p",
225204
[ConnectionPid,
226205
SubscriptionId,
227206
VirtualHost,
228207
Stream,
229208
ConsumerName]),
230-
Consumers0
209+
Group0
231210
end,
232211
SGS = update_groups(VirtualHost,
233212
Stream,
234213
ConsumerName,
235-
Group0#group{consumers = Consumers1},
214+
Group1,
236215
StreamGroups0),
237216
State0#state{groups = SGS}
238217
end,
@@ -260,14 +239,62 @@ lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups) ->
260239
add_to_group(Consumer, #group{consumers = Consumers} = Group) ->
261240
Group#group{consumers = Consumers ++ [Consumer]}.
262241

263-
compute_active_flag(Consumer,
264-
#group{partition_index = -1, consumers = [Consumer]}) ->
265-
true;
266-
compute_active_flag(Consumer,
267-
#group{partition_index = -1, consumers = [Consumer | _]}) ->
268-
true;
269-
compute_active_flag(_, _) ->
270-
false.
242+
remove_from_group(Consumer, #group{consumers = Consumers} = Group) ->
243+
Group#group{consumers = lists:delete(Consumer, Consumers)}.
244+
245+
compute_active_consumer(#group{consumers = []} = Group) ->
246+
Group;
247+
compute_active_consumer(#group{partition_index = -1,
248+
consumers = [Consumer0]} =
249+
Group0) ->
250+
Consumer1 = Consumer0#consumer{active = true},
251+
Group0#group{consumers = [Consumer1]};
252+
compute_active_consumer(#group{partition_index = -1,
253+
consumers = [Consumer0 | T]} =
254+
Group0) ->
255+
Consumer1 = Consumer0#consumer{active = true},
256+
Consumers = lists:map(fun(C) -> C#consumer{active = false} end, T),
257+
Group0#group{consumers = [Consumer1] ++ Consumers}.
258+
259+
lookup_consumer(ConnectionPid, SubscriptionId,
260+
#group{consumers = Consumers}) ->
261+
lists:search(fun(#consumer{pid = ConnPid, subscription_id = SubId}) ->
262+
ConnPid == ConnectionPid andalso SubId == SubscriptionId
263+
end,
264+
Consumers).
265+
266+
lookup_active_consumer(#group{consumers = Consumers}) ->
267+
lists:search(fun(#consumer{active = Active}) -> Active end,
268+
Consumers).
269+
270+
notify_consumers(_, _, #group{consumers = []}) ->
271+
ok;
272+
notify_consumers(_FormerActive,
273+
#consumer{pid = ConnectionPid,
274+
subscription_id = SubscriptionId} =
275+
NewConsumer,
276+
#group{partition_index = -1, consumers = [NewConsumer]}) ->
277+
ConnectionPid
278+
! {sac,
279+
{{subscription_id, SubscriptionId}, {active, true},
280+
{side_effects, []}}};
281+
notify_consumers(_FormerActive,
282+
#consumer{pid = ConnectionPid,
283+
subscription_id = SubscriptionId} =
284+
NewConsumer,
285+
#group{partition_index = -1, consumers = [NewConsumer | _]}) ->
286+
ConnectionPid
287+
! {sac,
288+
{{subscription_id, SubscriptionId}, {active, true},
289+
{side_effects, []}}};
290+
notify_consumers(_FormerActive,
291+
#consumer{pid = ConnectionPid,
292+
subscription_id = SubscriptionId},
293+
#group{partition_index = -1, consumers = _}) ->
294+
ConnectionPid
295+
! {sac,
296+
{{subscription_id, SubscriptionId}, {active, false},
297+
{side_effects, []}}}.
271298

272299
update_active_flag(Consumer, Active, Consumers) ->
273300
lists:foldl(fun (C, Acc) when C == Consumer ->

0 commit comments

Comments
 (0)