Skip to content

Commit 787db9b

Browse files
committed
Compute stream partition index
References #3753
1 parent a0a93da commit 787db9b

File tree

4 files changed

+251
-25
lines changed

4 files changed

+251
-25
lines changed

deps/rabbitmq_stream/src/rabbit_stream_manager.erl

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
lookup_local_member/2,
3636
topology/2,
3737
route/3,
38-
partitions/2]).
38+
partitions/2,
39+
partition_index/3]).
3940

4041
-record(state, {configuration}).
4142

@@ -116,6 +117,12 @@ route(RoutingKey, VirtualHost, SuperStream) ->
116117
partitions(VirtualHost, SuperStream) ->
117118
gen_server:call(?MODULE, {partitions, VirtualHost, SuperStream}).
118119

120+
-spec partition_index(binary(), binary(), binary()) ->
121+
{ok, integer()} | {error, stream_not_found}.
122+
partition_index(VirtualHost, SuperStream, Stream) ->
123+
gen_server:call(?MODULE,
124+
{partition_index, VirtualHost, SuperStream, Stream}).
125+
119126
stream_queue_arguments(Arguments) ->
120127
stream_queue_arguments([{<<"x-queue-type">>, longstr, <<"stream">>}],
121128
Arguments).
@@ -422,6 +429,56 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From,
422429
handle_call({partitions, VirtualHost, SuperStream}, _From, State) ->
423430
Res = super_stream_partitions(VirtualHost, SuperStream),
424431
{reply, Res, State};
432+
handle_call({partition_index, VirtualHost, SuperStream, Stream},
433+
_From, State) ->
434+
ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream),
435+
Res = try
436+
rabbit_exchange:lookup_or_die(ExchangeName),
437+
UnorderedBindings =
438+
[Binding
439+
|| Binding = #binding{destination = #resource{name = Q} = D}
440+
<- rabbit_binding:list_for_source(ExchangeName),
441+
is_resource_stream_queue(D), Q == Stream],
442+
OrderedBindings =
443+
rabbit_stream_utils:sort_partitions(UnorderedBindings),
444+
case OrderedBindings of
445+
[] ->
446+
{error, stream_not_found};
447+
Bindings ->
448+
Binding = lists:nth(1, Bindings),
449+
#binding{args = Args} = Binding,
450+
case rabbit_misc:table_lookup(Args,
451+
<<"x-stream-partition-order">>)
452+
of
453+
{_, Order} ->
454+
Index = rabbit_data_coercion:to_integer(Order),
455+
{ok, Index};
456+
_ ->
457+
Pattern = <<"-">>,
458+
Size = byte_size(Pattern),
459+
case string:find(Stream, Pattern, trailing) of
460+
nomatch ->
461+
{ok, -1};
462+
<<Pattern:Size/binary, Rest/binary>> ->
463+
try
464+
Index = binary_to_integer(Rest),
465+
{ok, Index}
466+
catch
467+
error:_ ->
468+
{ok, -1}
469+
end;
470+
_ ->
471+
{ok, -1}
472+
end
473+
end
474+
end
475+
catch
476+
exit:Error ->
477+
rabbit_log:error("Error while looking up exchange ~p, ~p",
478+
[ExchangeName, Error]),
479+
{error, stream_not_found}
480+
end,
481+
{reply, Res, State};
425482
handle_call(which_children, _From, State) ->
426483
{reply, [], State}.
427484

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -780,6 +780,7 @@ open(info, {sac, {{subscription_id, SubId}, {active, Active}}},
780780
{Connection1, ConnState1} =
781781
case Consumers0 of
782782
#{SubId := Consumer0} ->
783+
%% FIXME check consumer is SAC, to avoid changing a regular consumer
783784
Consumer1 = Consumer0#consumer{active = Active},
784785
Conn1 =
785786
maybe_notify_consumer(Transport,
@@ -1854,6 +1855,7 @@ handle_frame_post_auth(Transport,
18541855
Stream,
18551856
ConsumerName,
18561857
SubscriptionId,
1858+
Properties,
18571859
Sac),
18581860

18591861
Connection1 =
@@ -2614,16 +2616,19 @@ maybe_dispatch_on_subscription(_Transport,
26142616
Consumers1 = Consumers#{SubscriptionId => ConsumerState},
26152617
State#stream_connection_state{consumers = Consumers1}.
26162618

2617-
maybe_register_consumer(_, _, _, _, false = _Sac) ->
2619+
maybe_register_consumer(_, _, _, _, _, false = _Sac) ->
26182620
true;
26192621
maybe_register_consumer(VirtualHost,
26202622
Stream,
26212623
ConsumerName,
26222624
SubscriptionId,
2625+
Properties,
26232626
true) ->
2627+
PartitionIndex = partition_index(VirtualHost, Stream, Properties),
26242628
{ok, Active} =
26252629
rabbit_stream_sac_coordinator:register_consumer(VirtualHost,
26262630
Stream,
2631+
PartitionIndex,
26272632
ConsumerName,
26282633
self(),
26292634
SubscriptionId),
@@ -2667,6 +2672,21 @@ maybe_unregister_consumer(VirtualHost,
26672672
self(),
26682673
SubscriptionId).
26692674

2675+
partition_index(VirtualHost, Stream, Properties) ->
2676+
case Properties of
2677+
#{<<"super-stream">> := SuperStream} ->
2678+
case rabbit_stream_manager:partition_index(VirtualHost, SuperStream,
2679+
Stream)
2680+
of
2681+
{ok, Index} ->
2682+
Index;
2683+
_ ->
2684+
-1
2685+
end;
2686+
_ ->
2687+
-1
2688+
end.
2689+
26702690
notify_connection_closed(#statem_data{connection =
26712691
#stream_connection{name = Name,
26722692
publishers =

deps/rabbitmq_stream/src/rabbit_stream_sac_coordinator.erl

Lines changed: 59 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
handle_info/2,
2828
terminate/2,
2929
code_change/3]).
30-
-export([register_consumer/5,
30+
-export([register_consumer/6,
3131
unregister_consumer/5]).
3232

3333
-type vhost() :: binary().
@@ -36,19 +36,23 @@
3636
-type subscription_id() :: byte().
3737

3838
-record(consumer,
39-
{pid :: pid(), subscription_id :: subscription_id()}).
40-
-record(group, {consumers :: [#consumer{}]}).
39+
{pid :: pid(), subscription_id :: subscription_id(),
40+
active :: boolean()}).
41+
-record(group,
42+
{consumers :: [#consumer{}], partition_index :: integer()}).
4143
-record(state,
4244
{groups :: #{{vhost(), stream(), consumer_name()} => #group{}}}).
4345

4446
register_consumer(VirtualHost,
4547
Stream,
48+
PartitionIndex,
4649
ConsumerName,
4750
ConnectionPid,
4851
SubscriptionId) ->
4952
call({register_consumer,
5053
VirtualHost,
5154
Stream,
55+
PartitionIndex,
5256
ConsumerName,
5357
ConnectionPid,
5458
SubscriptionId}).
@@ -121,23 +125,39 @@ init([]) ->
121125
handle_call({register_consumer,
122126
VirtualHost,
123127
Stream,
128+
PartitionIndex,
124129
ConsumerName,
125130
ConnectionPid,
126131
SubscriptionId},
127132
_From, #state{groups = StreamGroups0} = State) ->
133+
%% TODO monitor connection PID to remove consumers when their connection dies
134+
%% this could require some index to avoid crawling the whole data structure
135+
%% this is necessary to fail over to another consumer when one dies abruptly
136+
%% also, check the liveliness of each consumer whenever there's a change in the group,
137+
%% to make sure to get rid of zombies
138+
%%
139+
%% TODO monitor streams and virtual hosts as well
128140
StreamGroups1 =
129-
maybe_create_group(VirtualHost, Stream, ConsumerName, StreamGroups0),
141+
maybe_create_group(VirtualHost,
142+
Stream,
143+
PartitionIndex,
144+
ConsumerName,
145+
StreamGroups0),
130146
Group0 =
131147
lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups1),
132148
Consumer =
133-
#consumer{pid = ConnectionPid, subscription_id = SubscriptionId},
134-
Group = add_to_group(Consumer, Group0),
135-
Active = is_active(Consumer, Group),
149+
#consumer{pid = ConnectionPid,
150+
subscription_id = SubscriptionId,
151+
active = false},
152+
Group1 = add_to_group(Consumer, Group0),
153+
Active = compute_active_flag(Consumer, Group1),
154+
#group{consumers = Consumers0} = Group1,
155+
Consumers1 = update_active_flag(Consumer, Active, Consumers0),
136156
StreamGroups2 =
137157
update_groups(VirtualHost,
138158
Stream,
139159
ConsumerName,
140-
Group,
160+
Group1#group{consumers = Consumers1},
141161
StreamGroups1),
142162
{reply, {ok, Active}, State#state{groups = StreamGroups2}};
143163
handle_call({unregister_consumer,
@@ -164,32 +184,34 @@ handle_call({unregister_consumer,
164184
{value, Consumer} ->
165185
rabbit_log:debug("Unregistering consumer ~p from group",
166186
[Consumer]),
167-
case lists:nth(1, Consumers0) of
168-
Consumer ->
187+
case Consumer of
188+
#consumer{active = true} ->
169189
rabbit_log:debug("Unregistering the active consumer"),
170190
%% this is active one, remove it and notify the new active one if group not empty
171191
Cs = lists:delete(Consumer, Consumers0),
172192
case Cs of
173193
[] ->
174194
%% group is empty now
175195
rabbit_log:debug("Group is now empty"),
176-
ok;
196+
Cs;
177197
_ ->
178198
%% get new active one (the first) and notify it
199+
NewActive = lists:nth(1, Cs),
179200
#consumer{pid = Pid,
180201
subscription_id = SubId} =
181-
lists:nth(1, Cs),
202+
NewActive,
182203
rabbit_log:debug("New active consumer is ~p ~p",
183204
[Pid, SubId]),
184205
Pid
185206
! {sac,
186207
{{subscription_id, SubId},
187-
{active, true}}}
188-
end,
189-
Cs;
208+
{active, true}}},
209+
update_active_flag(NewActive, true,
210+
Cs)
211+
end;
190212
_ActiveConsumer ->
191-
rabbit_log:debug("Not the active consumer, just removing from the "
192-
"group"),
213+
rabbit_log:debug("Not the active consumer, just removing it from "
214+
"the group"),
193215
lists:delete(Consumer, Consumers0)
194216
end;
195217
error ->
@@ -212,13 +234,18 @@ handle_call({unregister_consumer,
212234
handle_call(which_children, _From, State) ->
213235
{reply, [], State}.
214236

215-
maybe_create_group(VirtualHost, Stream, ConsumerName, StreamGroups) ->
237+
maybe_create_group(VirtualHost,
238+
Stream,
239+
PartitionIndex,
240+
ConsumerName,
241+
StreamGroups) ->
216242
case StreamGroups of
217243
#{{VirtualHost, Stream, ConsumerName} := _Group} ->
218244
StreamGroups;
219245
SGS ->
220246
maps:put({VirtualHost, Stream, ConsumerName},
221-
#group{consumers = []}, SGS)
247+
#group{consumers = [], partition_index = PartitionIndex},
248+
SGS)
222249
end.
223250

224251
lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups) ->
@@ -227,13 +254,23 @@ lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups) ->
227254
add_to_group(Consumer, #group{consumers = Consumers} = Group) ->
228255
Group#group{consumers = Consumers ++ [Consumer]}.
229256

230-
is_active(Consumer, #group{consumers = [Consumer]}) ->
257+
compute_active_flag(Consumer,
258+
#group{partition_index = -1, consumers = [Consumer]}) ->
231259
true;
232-
is_active(Consumer, #group{consumers = [Consumer | _]}) ->
260+
compute_active_flag(Consumer,
261+
#group{partition_index = -1, consumers = [Consumer | _]}) ->
233262
true;
234-
is_active(_, _) ->
263+
compute_active_flag(_, _) ->
235264
false.
236265

266+
update_active_flag(Consumer, Active, Consumers) ->
267+
lists:foldl(fun (C, Acc) when C == Consumer ->
268+
Acc ++ [Consumer#consumer{active = Active}];
269+
(C, Acc) ->
270+
Acc ++ [C]
271+
end,
272+
[], Consumers).
273+
237274
update_groups(VirtualHost,
238275
Stream,
239276
ConsumerName,

0 commit comments

Comments
 (0)