Skip to content

Commit 796268b

Browse files
committed
Handle SAC (simple) failover
When consumers unsubscribe normally. References #3753
1 parent 8c19092 commit 796268b

File tree

2 files changed

+193
-50
lines changed

2 files changed

+193
-50
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 72 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
member_pid :: pid(),
3838
offset :: osiris:offset(),
3939
subscription_id :: subscription_id(),
40-
segment :: osiris_log:state(),
40+
segment :: undefined | osiris_log:state(),
4141
credit :: integer(),
4242
stream :: stream(),
4343
counters :: atomics:atomics_ref(),
@@ -763,6 +763,34 @@ open(info, {OK, S, Data},
763763
StatemData#statem_data{connection = Connection1,
764764
connection_state = State2}}
765765
end;
766+
open(info, {sac, {{subscription_id, SubId}, {active, Active}}},
767+
#statem_data{transport = Transport,
768+
connection = Connection0,
769+
connection_state = ConnState0} =
770+
State) ->
771+
rabbit_log:debug("Subscription ~p instructed to become active: ~p",
772+
[SubId, Active]),
773+
#stream_connection_state{consumers = Consumers0} = ConnState0,
774+
{Connection1, ConnState1} =
775+
case Consumers0 of
776+
#{SubId := Consumer0} ->
777+
Consumer1 = Consumer0#consumer{active = Active},
778+
Conn1 =
779+
maybe_notify_consumer(Transport,
780+
Connection0,
781+
SubId,
782+
Active,
783+
true),
784+
{Conn1,
785+
ConnState0#stream_connection_state{consumers =
786+
Consumers0#{SubId =>
787+
Consumer1}}};
788+
_ ->
789+
{Connection0, ConnState0}
790+
end,
791+
{keep_state,
792+
State#statem_data{connection = Connection1,
793+
connection_state = ConnState1}};
766794
open(info, {Closed, Socket}, #statem_data{connection = Connection})
767795
when Closed =:= tcp_closed; Closed =:= ssl_closed ->
768796
demonitor_all_streams(Connection),
@@ -1001,15 +1029,19 @@ open(cast,
10011029
maps:remove(StreamName,
10021030
StreamSubscriptions)},
10031031
State};
1004-
CorrelationIds when is_list(CorrelationIds) ->
1032+
SubscriptionIds when is_list(SubscriptionIds) ->
10051033
Consumers1 =
1006-
lists:foldl(fun(CorrelationId, ConsumersAcc) ->
1007-
#{CorrelationId := Consumer} = ConsumersAcc,
1008-
#consumer{credit = Credit} = Consumer,
1034+
lists:foldl(fun(SubscriptionId, ConsumersAcc) ->
1035+
#{SubscriptionId := Consumer} = ConsumersAcc,
1036+
#consumer{credit = Credit,
1037+
segment = Segment} =
1038+
Consumer,
10091039
Consumer1 =
1010-
case Credit of
1011-
0 -> Consumer;
1012-
_ ->
1040+
case {Credit, Segment} of
1041+
{_, undefined} ->
1042+
Consumer; %% SAC not active
1043+
{0, _} -> Consumer;
1044+
{_, _} ->
10131045
case send_chunks(Transport,
10141046
Consumer,
10151047
SendFileOct)
@@ -1034,9 +1066,9 @@ open(cast,
10341066
Credit1}
10351067
end
10361068
end,
1037-
ConsumersAcc#{CorrelationId => Consumer1}
1069+
ConsumersAcc#{SubscriptionId => Consumer1}
10381070
end,
1039-
Consumers, CorrelationIds),
1071+
Consumers, SubscriptionIds),
10401072
{Connection,
10411073
State#stream_connection_state{consumers = Consumers1}}
10421074
end,
@@ -1812,7 +1844,8 @@ handle_frame_post_auth(Transport,
18121844
atomics:new(2, [{signed, false}]),
18131845

18141846
Active =
1815-
maybe_register_consumer(Stream,
1847+
maybe_register_consumer(VirtualHost,
1848+
Stream,
18161849
ConsumerName,
18171850
SubscriptionId,
18181851
Sac),
@@ -1889,6 +1922,8 @@ handle_frame_post_auth(Transport,
18891922
Connection,
18901923
#stream_connection_state{consumers = Consumers} = State,
18911924
{credit, SubscriptionId, Credit}) ->
1925+
%% FIXME check consumer is active
1926+
%% if not active, don't dispatch, just return appropriate response
18921927
case Consumers of
18931928
#{SubscriptionId := Consumer} ->
18941929
#consumer{credit = AvailableCredit} = Consumer,
@@ -2573,11 +2608,16 @@ maybe_dispatch_on_subscription(_Transport,
25732608
Consumers1 = Consumers#{SubscriptionId => ConsumerState},
25742609
State#stream_connection_state{consumers = Consumers1}.
25752610

2576-
maybe_register_consumer(_, _, _, false = _Sac) ->
2611+
maybe_register_consumer(_, _, _, _, false = _Sac) ->
25772612
true;
2578-
maybe_register_consumer(Stream, ConsumerName, SubscriptionId, true) ->
2613+
maybe_register_consumer(VirtualHost,
2614+
Stream,
2615+
ConsumerName,
2616+
SubscriptionId,
2617+
true) ->
25792618
{ok, Active} =
2580-
rabbit_stream_sac_coordinator:register_consumer(Stream,
2619+
rabbit_stream_sac_coordinator:register_consumer(VirtualHost,
2620+
Stream,
25812621
ConsumerName,
25822622
self(),
25832623
SubscriptionId),
@@ -2607,6 +2647,20 @@ maybe_notify_consumer(Transport,
26072647
Connection#stream_connection{correlation_id_sequence = CorrIdSeq + 1,
26082648
outstanding_requests = OutstandingRequests1}.
26092649

2650+
maybe_unregister_consumer(_, _, false = _Sac) ->
2651+
ok;
2652+
maybe_unregister_consumer(VirtualHost,
2653+
#consumer{stream = Stream,
2654+
properties = Properties,
2655+
subscription_id = SubscriptionId},
2656+
true = _Sac) ->
2657+
ConsumerName = consumer_name(Properties),
2658+
rabbit_stream_sac_coordinator:unregister_consumer(VirtualHost,
2659+
Stream,
2660+
ConsumerName,
2661+
self(),
2662+
SubscriptionId).
2663+
26102664
notify_connection_closed(#statem_data{connection =
26112665
#stream_connection{name = Name,
26122666
publishers =
@@ -2757,7 +2811,8 @@ lookup_leader_from_manager(VirtualHost, Stream) ->
27572811
rabbit_stream_manager:lookup_leader(VirtualHost, Stream).
27582812

27592813
remove_subscription(SubscriptionId,
2760-
#stream_connection{stream_subscriptions =
2814+
#stream_connection{virtual_host = VirtualHost,
2815+
stream_subscriptions =
27612816
StreamSubscriptions} =
27622817
Connection,
27632818
#stream_connection_state{consumers = Consumers} = State) ->
@@ -2782,6 +2837,8 @@ remove_subscription(SubscriptionId,
27822837
rabbit_stream_metrics:consumer_cancelled(self(),
27832838
stream_r(Stream, Connection2),
27842839
SubscriptionId),
2840+
maybe_unregister_consumer(VirtualHost, Consumer,
2841+
single_active_consumer(Consumer#consumer.properties)),
27852842
{Connection2, State#stream_connection_state{consumers = Consumers1}}.
27862843

27872844
maybe_clean_connection_from_stream(Stream,

deps/rabbitmq_stream/src/rabbit_stream_sac_coordinator.erl

Lines changed: 121 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -27,23 +27,39 @@
2727
handle_info/2,
2828
terminate/2,
2929
code_change/3]).
30-
-export([register_consumer/4]).
30+
-export([register_consumer/5,
31+
unregister_consumer/5]).
3132

33+
-type vhost() :: binary().
3234
-type stream() :: binary().
3335
-type consumer_name() :: binary().
3436
-type subscription_id() :: byte().
3537

3638
-record(consumer,
3739
{pid :: pid(), subscription_id :: subscription_id()}).
3840
-record(group, {consumers :: [#consumer{}]}).
39-
-record(stream_groups, {groups :: #{consumer_name() => #group{}}}).
40-
-record(state, {stream_groups :: #{stream() => #stream_groups{}}}).
41+
-record(state,
42+
{groups :: #{{vhost(), stream(), consumer_name()} => #group{}}}).
4143

42-
register_consumer(Stream,
44+
register_consumer(VirtualHost,
45+
Stream,
4346
ConsumerName,
4447
ConnectionPid,
4548
SubscriptionId) ->
4649
call({register_consumer,
50+
VirtualHost,
51+
Stream,
52+
ConsumerName,
53+
ConnectionPid,
54+
SubscriptionId}).
55+
56+
unregister_consumer(VirtualHost,
57+
Stream,
58+
ConsumerName,
59+
ConnectionPid,
60+
SubscriptionId) ->
61+
call({unregister_consumer,
62+
VirtualHost,
4763
Stream,
4864
ConsumerName,
4965
ConnectionPid,
@@ -86,7 +102,7 @@ start_link() ->
86102
%% @end
87103
%%--------------------------------------------------------------------
88104
init([]) ->
89-
{ok, #state{stream_groups = #{}}}.
105+
{ok, #state{groups = #{}}}.
90106

91107
%%--------------------------------------------------------------------
92108
%% @private
@@ -103,50 +119,110 @@ init([]) ->
103119
%% @end
104120
%%--------------------------------------------------------------------
105121
handle_call({register_consumer,
122+
VirtualHost,
106123
Stream,
107124
ConsumerName,
108125
ConnectionPid,
109126
SubscriptionId},
110-
_From, #state{stream_groups = StreamGroups0} = State) ->
127+
_From, #state{groups = StreamGroups0} = State) ->
111128
StreamGroups1 =
112-
maybe_create_group(Stream, ConsumerName, StreamGroups0),
113-
Group0 = lookup_group(Stream, ConsumerName, StreamGroups1),
129+
maybe_create_group(VirtualHost, Stream, ConsumerName, StreamGroups0),
130+
Group0 =
131+
lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups1),
114132
Consumer =
115133
#consumer{pid = ConnectionPid, subscription_id = SubscriptionId},
116134
Group = add_to_group(Consumer, Group0),
117135
Active = is_active(Consumer, Group),
118136
StreamGroups2 =
119-
update_groups(Stream, ConsumerName, Group, StreamGroups1),
120-
{reply, {ok, Active}, State#state{stream_groups = StreamGroups2}};
137+
update_groups(VirtualHost,
138+
Stream,
139+
ConsumerName,
140+
Group,
141+
StreamGroups1),
142+
{reply, {ok, Active}, State#state{groups = StreamGroups2}};
143+
handle_call({unregister_consumer,
144+
VirtualHost,
145+
Stream,
146+
ConsumerName,
147+
ConnectionPid,
148+
SubscriptionId},
149+
_From, #state{groups = StreamGroups0} = State0) ->
150+
State1 =
151+
case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of
152+
error ->
153+
State0;
154+
Group0 ->
155+
#group{consumers = Consumers0} = Group0,
156+
Consumers1 =
157+
case lists:search(fun(#consumer{pid = ConnPid,
158+
subscription_id = SubId}) ->
159+
ConnPid == ConnectionPid
160+
andalso SubId == SubscriptionId
161+
end,
162+
Consumers0)
163+
of
164+
{value, Consumer} ->
165+
rabbit_log:debug("Unregistering consumer ~p from group",
166+
[Consumer]),
167+
case lists:nth(1, Consumers0) of
168+
Consumer ->
169+
rabbit_log:debug("Unregistering the active consumer"),
170+
%% this is active one, remove it and notify the new active one if group not empty
171+
Cs = lists:delete(Consumer, Consumers0),
172+
case Cs of
173+
[] ->
174+
%% group is empty now
175+
rabbit_log:debug("Group is now empty"),
176+
ok;
177+
_ ->
178+
%% get new active one (the first) and notify it
179+
#consumer{pid = Pid,
180+
subscription_id = SubId} =
181+
lists:nth(1, Cs),
182+
rabbit_log:debug("New active consumer is ~p ~p",
183+
[Pid, SubId]),
184+
Pid
185+
! {sac,
186+
{{subscription_id, SubId},
187+
{active, true}}}
188+
end,
189+
Cs;
190+
_ActiveConsumer ->
191+
rabbit_log:debug("Not the active consumer, just removing from the "
192+
"group"),
193+
lists:delete(Consumer, Consumers0)
194+
end;
195+
error ->
196+
rabbit_log:debug("Could not find consumer ~p ~p in group ~p ~p ~p",
197+
[ConnectionPid,
198+
SubscriptionId,
199+
VirtualHost,
200+
Stream,
201+
ConsumerName]),
202+
Consumers0
203+
end,
204+
SGS = update_groups(VirtualHost,
205+
Stream,
206+
ConsumerName,
207+
Group0#group{consumers = Consumers1},
208+
StreamGroups0),
209+
State0#state{groups = SGS}
210+
end,
211+
{reply, ok, State1};
121212
handle_call(which_children, _From, State) ->
122213
{reply, [], State}.
123214

124-
maybe_create_group(Stream, ConsumerName, StreamGroups) ->
215+
maybe_create_group(VirtualHost, Stream, ConsumerName, StreamGroups) ->
125216
case StreamGroups of
126-
#{Stream := #stream_groups{groups = #{ConsumerName := _Consumers}}} ->
127-
%% the group already exists
217+
#{{VirtualHost, Stream, ConsumerName} := _Group} ->
128218
StreamGroups;
129-
#{Stream := #stream_groups{groups = GroupsForTheStream} = SG} ->
130-
%% there are groups for this streams, but not one for this consumer name
131-
GroupsForTheStream1 =
132-
maps:put(ConsumerName, #group{consumers = []},
133-
GroupsForTheStream),
134-
StreamGroups#{Stream =>
135-
SG#stream_groups{groups = GroupsForTheStream1}};
136219
SGS ->
137-
SG = maps:get(Stream, SGS, #stream_groups{groups = #{}}),
138-
#stream_groups{groups = Groups} = SG,
139-
Groups1 = maps:put(ConsumerName, #group{consumers = []}, Groups),
140-
SGS#{Stream => SG#stream_groups{groups = Groups1}}
220+
maps:put({VirtualHost, Stream, ConsumerName},
221+
#group{consumers = []}, SGS)
141222
end.
142223

143-
lookup_group(Stream, ConsumerName, StreamGroups) ->
144-
case StreamGroups of
145-
#{Stream := #stream_groups{groups = #{ConsumerName := Group}}} ->
146-
Group;
147-
_ ->
148-
error
149-
end.
224+
lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups) ->
225+
maps:get({VirtualHost, Stream, ConsumerName}, StreamGroups).
150226

151227
add_to_group(Consumer, #group{consumers = Consumers} = Group) ->
152228
Group#group{consumers = Consumers ++ [Consumer]}.
@@ -158,10 +234,20 @@ is_active(Consumer, #group{consumers = [Consumer | _]}) ->
158234
is_active(_, _) ->
159235
false.
160236

161-
update_groups(Stream, ConsumerName, Group, StreamGroups) ->
162-
#{Stream := #stream_groups{groups = Groups}} = StreamGroups,
163-
Groups1 = maps:put(ConsumerName, Group, Groups),
164-
StreamGroups#{Stream => #stream_groups{groups = Groups1}}.
237+
update_groups(VirtualHost,
238+
Stream,
239+
ConsumerName,
240+
#group{consumers = []},
241+
StreamGroups) ->
242+
rabbit_log:debug("Group ~p ~p ~p is now empty, removing it",
243+
[VirtualHost, Stream, ConsumerName]),
244+
maps:remove({VirtualHost, Stream, ConsumerName}, StreamGroups);
245+
update_groups(VirtualHost,
246+
Stream,
247+
ConsumerName,
248+
Group,
249+
StreamGroups) ->
250+
maps:put({VirtualHost, Stream, ConsumerName}, Group, StreamGroups).
165251

166252
handle_cast(_Msg, State) ->
167253
{noreply, State}.

0 commit comments

Comments
 (0)