Skip to content

Commit 8c19092

Browse files
committed
Handle single active consumer registration
WIP. Uses a simple in-memory coordinator for now. No failover yet. References #3753
1 parent fb251ab commit 8c19092

File tree

8 files changed

+643
-102
lines changed

8 files changed

+643
-102
lines changed

deps/rabbitmq_stream/docs/PROTOCOL.adoc

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,12 @@ used to make the difference between a request (0) and a response (1). Example fo
214214
|Client
215215
|0x0019
216216
|Yes
217+
218+
|<<consumerupdate>> (experimental)
219+
|Server
220+
|0x0020
221+
|Yes
222+
217223
|===
218224

219225
=== DeclarePublisher
@@ -593,10 +599,11 @@ RouteQuery => Key Version CorrelationId RoutingKey SuperStream
593599
RoutingKey => string
594600
SuperStream => string
595601

596-
RouteResponse => Key Version CorrelationId [Stream]
602+
RouteResponse => Key Version CorrelationId ResponseCode [Stream]
597603
Key => uint16 // 0x8018
598604
Version => uint16
599605
CorrelationId => uint32
606+
ResponseCode => uint16
600607
Stream => string
601608
```
602609

@@ -611,13 +618,34 @@ PartitionsQuery => Key Version CorrelationId SuperStream
611618
CorrelationId => uint32
612619
SuperStream => string
613620

614-
PartitionsResponse => Key Version CorrelationId [Stream]
621+
PartitionsResponse => Key Version CorrelationId ResponseCode [Stream]
615622
Key => uint16 // 0x8019
616623
Version => uint16
617624
CorrelationId => uint32
625+
ResponseCode => uint16
618626
Stream => string
619627
```
620628

629+
=== Consumer Update (experimental)
630+
631+
```
632+
ConsumerUpdateQuery => Key Version CorrelationId SubscriptionId Active
633+
Key => uint16 // 0x001a
634+
Version => uint16
635+
CorrelationId => uint32
636+
SubscriptionId => uint8
637+
Active => uint8 (boolean, 0 = false, 1 = true)
638+
639+
ConsumerUpdateResponse => Key Version CorrelationId ResponseCode OffsetSpecification
640+
Key => uint16 // 0x801a
641+
Version => uint16
642+
CorrelationId => uint32
643+
ResponseCode => uint16
644+
OffsetSpecification => OffsetType Offset
645+
OffsetType => uint16 // 0 (none), 1 (first), 2 (last), 3 (next), 4 (offset), 5 (timestamp)
646+
Offset => uint64 (for offset) | int64 (for timestamp)
647+
```
648+
621649
== Authentication
622650

623651
Once a client is connected to the server, it initiates an authentication

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 303 additions & 87 deletions
Large diffs are not rendered by default.
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 2.0 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at https://www.mozilla.org/en-US/MPL/2.0/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
14+
%% Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
15+
%%
16+
17+
-module(rabbit_stream_sac_coordinator).
18+
19+
-behaviour(gen_server).
20+
21+
%% API functions
22+
-export([start_link/0]).
23+
%% gen_server callbacks
24+
-export([init/1,
25+
handle_call/3,
26+
handle_cast/2,
27+
handle_info/2,
28+
terminate/2,
29+
code_change/3]).
30+
-export([register_consumer/4]).
31+
32+
-type stream() :: binary().
33+
-type consumer_name() :: binary().
34+
-type subscription_id() :: byte().
35+
36+
-record(consumer,
37+
{pid :: pid(), subscription_id :: subscription_id()}).
38+
-record(group, {consumers :: [#consumer{}]}).
39+
-record(stream_groups, {groups :: #{consumer_name() => #group{}}}).
40+
-record(state, {stream_groups :: #{stream() => #stream_groups{}}}).
41+
42+
register_consumer(Stream,
43+
ConsumerName,
44+
ConnectionPid,
45+
SubscriptionId) ->
46+
call({register_consumer,
47+
Stream,
48+
ConsumerName,
49+
ConnectionPid,
50+
SubscriptionId}).
51+
52+
call(Request) ->
53+
gen_server:call({global, ?MODULE},
54+
Request).%%%===================================================================
55+
%%% API functions
56+
%%%===================================================================
57+
58+
%%--------------------------------------------------------------------
59+
%% @doc
60+
%% Starts the server
61+
%%
62+
%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
63+
%% @end
64+
%%--------------------------------------------------------------------
65+
start_link() ->
66+
case gen_server:start_link({global, ?MODULE}, ?MODULE, [], []) of
67+
{error, {already_started, _Pid}} ->
68+
ignore;
69+
R ->
70+
R
71+
end.
72+
73+
%%%===================================================================
74+
%%% gen_server callbacks
75+
%%%===================================================================
76+
77+
%%--------------------------------------------------------------------
78+
%% @private
79+
%% @doc
80+
%% Initializes the server
81+
%%
82+
%% @spec init(Args) -> {ok, State} |
83+
%% {ok, State, Timeout} |
84+
%% ignore |
85+
%% {stop, Reason}
86+
%% @end
87+
%%--------------------------------------------------------------------
88+
init([]) ->
89+
{ok, #state{stream_groups = #{}}}.
90+
91+
%%--------------------------------------------------------------------
92+
%% @private
93+
%% @doc
94+
%% Handling call messages
95+
%%
96+
%% @spec handle_call(Request, From, State) ->
97+
%% {reply, Reply, State} |
98+
%% {reply, Reply, State, Timeout} |
99+
%% {noreply, State} |
100+
%% {noreply, State, Timeout} |
101+
%% {stop, Reason, Reply, State} |
102+
%% {stop, Reason, State}
103+
%% @end
104+
%%--------------------------------------------------------------------
105+
handle_call({register_consumer,
106+
Stream,
107+
ConsumerName,
108+
ConnectionPid,
109+
SubscriptionId},
110+
_From, #state{stream_groups = StreamGroups0} = State) ->
111+
StreamGroups1 =
112+
maybe_create_group(Stream, ConsumerName, StreamGroups0),
113+
Group0 = lookup_group(Stream, ConsumerName, StreamGroups1),
114+
Consumer =
115+
#consumer{pid = ConnectionPid, subscription_id = SubscriptionId},
116+
Group = add_to_group(Consumer, Group0),
117+
Active = is_active(Consumer, Group),
118+
StreamGroups2 =
119+
update_groups(Stream, ConsumerName, Group, StreamGroups1),
120+
{reply, {ok, Active}, State#state{stream_groups = StreamGroups2}};
121+
handle_call(which_children, _From, State) ->
122+
{reply, [], State}.
123+
124+
maybe_create_group(Stream, ConsumerName, StreamGroups) ->
125+
case StreamGroups of
126+
#{Stream := #stream_groups{groups = #{ConsumerName := _Consumers}}} ->
127+
%% the group already exists
128+
StreamGroups;
129+
#{Stream := #stream_groups{groups = GroupsForTheStream} = SG} ->
130+
%% there are groups for this streams, but not one for this consumer name
131+
GroupsForTheStream1 =
132+
maps:put(ConsumerName, #group{consumers = []},
133+
GroupsForTheStream),
134+
StreamGroups#{Stream =>
135+
SG#stream_groups{groups = GroupsForTheStream1}};
136+
SGS ->
137+
SG = maps:get(Stream, SGS, #stream_groups{groups = #{}}),
138+
#stream_groups{groups = Groups} = SG,
139+
Groups1 = maps:put(ConsumerName, #group{consumers = []}, Groups),
140+
SGS#{Stream => SG#stream_groups{groups = Groups1}}
141+
end.
142+
143+
lookup_group(Stream, ConsumerName, StreamGroups) ->
144+
case StreamGroups of
145+
#{Stream := #stream_groups{groups = #{ConsumerName := Group}}} ->
146+
Group;
147+
_ ->
148+
error
149+
end.
150+
151+
add_to_group(Consumer, #group{consumers = Consumers} = Group) ->
152+
Group#group{consumers = Consumers ++ [Consumer]}.
153+
154+
is_active(Consumer, #group{consumers = [Consumer]}) ->
155+
true;
156+
is_active(Consumer, #group{consumers = [Consumer | _]}) ->
157+
true;
158+
is_active(_, _) ->
159+
false.
160+
161+
update_groups(Stream, ConsumerName, Group, StreamGroups) ->
162+
#{Stream := #stream_groups{groups = Groups}} = StreamGroups,
163+
Groups1 = maps:put(ConsumerName, Group, Groups),
164+
StreamGroups#{Stream => #stream_groups{groups = Groups1}}.
165+
166+
handle_cast(_Msg, State) ->
167+
{noreply, State}.
168+
169+
%%--------------------------------------------------------------------
170+
%% @private
171+
%% @doc
172+
%% Handling cast messages
173+
%%
174+
%% @spec handle_cast(Msg, State) -> {noreply, State} |
175+
%% {noreply, State, Timeout} |
176+
%% {stop, Reason, State}
177+
%% @end
178+
%%--------------------------------------------------------------------
179+
180+
%%--------------------------------------------------------------------
181+
%% @private
182+
%% @doc
183+
%% Handling all non call/cast messages
184+
%%
185+
%% @spec handle_info(Info, State) -> {noreply, State} |
186+
%% {noreply, State, Timeout} |
187+
%% {stop, Reason, State}
188+
%% @end
189+
%%--------------------------------------------------------------------
190+
handle_info(_Info, State) ->
191+
{noreply, State}.
192+
193+
%%--------------------------------------------------------------------
194+
%% @private
195+
%% @doc
196+
%% This function is called by a gen_server when it is about to
197+
%% terminate. It should be the opposite of Module:init/1 and do any
198+
%% necessary cleaning up. When it returns, the gen_server terminates
199+
%% with Reason. The return value is ignored.
200+
%%
201+
%% @spec terminate(Reason, State) -> void()
202+
%% @end
203+
%%--------------------------------------------------------------------
204+
terminate(_Reason, _State) ->
205+
ok.
206+
207+
%%--------------------------------------------------------------------
208+
%% @private
209+
%% @doc
210+
%% Convert process state when code is changed
211+
%%
212+
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
213+
%% @end
214+
%%--------------------------------------------------------------------
215+
code_change(_OldVsn, State, _Extra) ->
216+
{ok, State}.
217+
218+
%%%===================================================================
219+
%%% Internal functions
220+
%%%===================================================================

deps/rabbitmq_stream/src/rabbit_stream_sup.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,14 @@ init([]) ->
7979
type => worker,
8080
start => {rabbit_stream_metrics_gc, start_link, []}},
8181

82+
SacCoordinator =
83+
#{id => rabbit_stream_sac_coordinator,
84+
type => worker,
85+
start => {rabbit_stream_sac_coordinator, start_link, []}},
86+
8287
{ok,
8388
{{one_for_all, 10, 10},
84-
[StreamManager, MetricsGc]
89+
[StreamManager, MetricsGc, SacCoordinator]
8590
++ listener_specs(fun tcp_listener_spec/1,
8691
[SocketOpts, ServerConfiguration, NumTcpAcceptors],
8792
Listeners)

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@
3030
-define(WAIT, 5000).
3131

3232
all() ->
33-
[{group, single_node},
34-
{group, single_node_1},
35-
{group, cluster}].
33+
[{group, single_node}, {group, single_node_1}, {group, cluster}].
3634

3735
groups() ->
3836
[{single_node, [],
@@ -64,7 +62,8 @@ init_per_suite(Config) ->
6462
end_per_suite(Config) ->
6563
Config.
6664

67-
init_per_group(Group, Config) when Group == single_node orelse Group == single_node_1 ->
65+
init_per_group(Group, Config)
66+
when Group == single_node orelse Group == single_node_1 ->
6867
Config1 =
6968
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]),
7069
Config2 =

deps/rabbitmq_stream_common/include/rabbit_stream.hrl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
-define(COMMAND_HEARTBEAT, 23).
2424
-define(COMMAND_ROUTE, 24).
2525
-define(COMMAND_PARTITIONS, 25).
26+
-define(COMMAND_CONSUMER_UPDATE, 26).
2627

2728
-define(REQUEST, 0).
2829
-define(RESPONSE, 1).
@@ -48,6 +49,7 @@
4849
-define(RESPONSE_CODE_PRECONDITION_FAILED, 17).
4950
-define(RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST, 18).
5051

52+
-define(OFFSET_TYPE_NONE, 0).
5153
-define(OFFSET_TYPE_FIRST, 1).
5254
-define(OFFSET_TYPE_LAST, 2).
5355
-define(OFFSET_TYPE_NEXT, 3).

0 commit comments

Comments
 (0)