Skip to content

Commit 88c6347

Browse files
committed
Notify stream consumer connection of SAC changes
References #3753
1 parent 787db9b commit 88c6347

File tree

2 files changed

+30
-23
lines changed

2 files changed

+30
-23
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -769,7 +769,10 @@ open(info, {OK, S, Data},
769769
StatemData#statem_data{connection = Connection1,
770770
connection_state = State2}}
771771
end;
772-
open(info, {sac, {{subscription_id, SubId}, {active, Active}}},
772+
open(info,
773+
{sac,
774+
{{subscription_id, SubId}, {active, Active},
775+
{side_effects, Effects}}},
773776
#statem_data{transport = Transport,
774777
connection = Connection0,
775778
connection_state = ConnState0} =
@@ -787,7 +790,8 @@ open(info, {sac, {{subscription_id, SubId}, {active, Active}}},
787790
Connection0,
788791
SubId,
789792
Active,
790-
true),
793+
true,
794+
Effects),
791795
{Conn1,
792796
ConnState0#stream_connection_state{consumers =
793797
Consumers0#{SubId =>
@@ -1850,6 +1854,11 @@ handle_frame_post_auth(Transport,
18501854
ConsumerCounters =
18511855
atomics:new(2, [{signed, false}]),
18521856

1857+
response_ok(Transport,
1858+
Connection,
1859+
subscribe,
1860+
CorrelationId),
1861+
18531862
Active =
18541863
maybe_register_consumer(VirtualHost,
18551864
Stream,
@@ -1858,13 +1867,6 @@ handle_frame_post_auth(Transport,
18581867
Properties,
18591868
Sac),
18601869

1861-
Connection1 =
1862-
maybe_notify_consumer(Transport,
1863-
Connection,
1864-
SubscriptionId,
1865-
Active,
1866-
Sac),
1867-
18681870
ConsumerState =
18691871
#consumer{member_pid = LocalMemberPid,
18701872
offset = OffsetSpec,
@@ -1877,20 +1879,15 @@ handle_frame_post_auth(Transport,
18771879
properties = Properties,
18781880
active = Active},
18791881

1880-
Connection2 =
1882+
Connection1 =
18811883
maybe_monitor_stream(LocalMemberPid, Stream,
1882-
Connection1),
1883-
1884-
response_ok(Transport,
1885-
Connection,
1886-
subscribe,
1887-
CorrelationId),
1884+
Connection),
18881885

18891886
State1 =
18901887
maybe_dispatch_on_subscription(Transport,
18911888
State,
18921889
ConsumerState,
1893-
Connection2,
1890+
Connection1,
18941891
Consumers,
18951892
Stream,
18961893
SubscriptionId,
@@ -1907,7 +1904,7 @@ handle_frame_post_auth(Transport,
19071904
StreamSubscriptions#{Stream =>
19081905
[SubscriptionId]}
19091906
end,
1910-
{Connection2#stream_connection{stream_subscriptions
1907+
{Connection1#stream_connection{stream_subscriptions
19111908
=
19121909
StreamSubscriptions1},
19131910
State1}
@@ -2400,7 +2397,8 @@ handle_frame_post_auth(Transport,
24002397
ResponseOffsetSpec}}) ->
24012398
%% FIXME check response code? It's supposed to be OK all the time.
24022399
case maps:take(CorrelationId, Requests0) of
2403-
{{{subscription_id, SubscriptionId}}, Rs} ->
2400+
{{{subscription_id, SubscriptionId}, {side_effects, _SideEffects}},
2401+
Rs} ->
24042402
rabbit_log:debug("Received consumer update response for subscription ~p",
24052403
[SubscriptionId]),
24062404
Consumers1 =
@@ -2634,7 +2632,7 @@ maybe_register_consumer(VirtualHost,
26342632
SubscriptionId),
26352633
Active.
26362634

2637-
maybe_notify_consumer(_, Connection, _, _, false = _Sac) ->
2635+
maybe_notify_consumer(_, Connection, _, _, _, false = _Sac) ->
26382636
Connection;
26392637
maybe_notify_consumer(Transport,
26402638
#stream_connection{socket = S,
@@ -2644,15 +2642,18 @@ maybe_notify_consumer(Transport,
26442642
Connection,
26452643
SubscriptionId,
26462644
Active,
2647-
true = _Sac) ->
2645+
true = _Sac,
2646+
SideEffects) ->
26482647
rabbit_log:debug("SAC subscription ~p, active = ~p",
26492648
[SubscriptionId, Active]),
26502649
Frame =
26512650
rabbit_stream_core:frame({request, CorrIdSeq,
26522651
{consumer_update, SubscriptionId, Active}}),
26532652

26542653
OutstandingRequests1 =
2655-
maps:put(CorrIdSeq, {{subscription_id, SubscriptionId}},
2654+
maps:put(CorrIdSeq,
2655+
{{subscription_id, SubscriptionId},
2656+
{side_effects, SideEffects}},
26562657
OutstandingRequests0),
26572658
send(Transport, S, Frame),
26582659
Connection#stream_connection{correlation_id_sequence = CorrIdSeq + 1,

deps/rabbitmq_stream/src/rabbit_stream_sac_coordinator.erl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,11 @@ handle_call({register_consumer,
159159
ConsumerName,
160160
Group1#group{consumers = Consumers1},
161161
StreamGroups1),
162+
ConnectionPid
163+
! {sac,
164+
{{subscription_id, SubscriptionId}, {active, Active},
165+
{side_effects, []}}},
166+
162167
{reply, {ok, Active}, State#state{groups = StreamGroups2}};
163168
handle_call({unregister_consumer,
164169
VirtualHost,
@@ -205,7 +210,8 @@ handle_call({unregister_consumer,
205210
Pid
206211
! {sac,
207212
{{subscription_id, SubId},
208-
{active, true}}},
213+
{active, true},
214+
{side_effects, []}}},
209215
update_active_flag(NewActive, true,
210216
Cs)
211217
end;

0 commit comments

Comments
 (0)