Skip to content

Commit 6d2e497

Browse files
Merge pull request #8453 from cloudamqp/cqv1_missing_del
Handle missing delivery marker in CQ v1 index
2 parents 01dc083 + 8689f57 commit 6d2e497

File tree

3 files changed

+263
-3
lines changed

3 files changed

+263
-3
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ rabbitmq_integration_suite(
311311
rabbitmq_integration_suite(
312312
name = "classic_queue_prop_SUITE",
313313
size = "large",
314-
shard_count = 3,
314+
shard_count = 6,
315315
sharding_method = "case",
316316
deps = [
317317
"@proper//:erlang_app",

deps/rabbit/src/rabbit_queue_index.erl

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -952,6 +952,10 @@ action_to_entry(RelSeq, Action, JEntries) ->
952952
({no_pub, del, no_ack}) when Action == ack ->
953953
{set, {no_pub, del, ack}};
954954
({?PUB, del, no_ack}) when Action == ack ->
955+
{reset, none};
956+
%% Special case, missing del
957+
%% See journal_minus_segment1/2
958+
({?PUB, no_del, no_ack}) when Action == ack ->
955959
{reset, none}
956960
end.
957961

@@ -1342,6 +1346,11 @@ segment_plus_journal1({?PUB = Pub, no_del, no_ack}, {no_pub, del, no_ack}) ->
13421346
segment_plus_journal1({?PUB, no_del, no_ack}, {no_pub, del, ack}) ->
13431347
{undefined, -1};
13441348
segment_plus_journal1({?PUB, del, no_ack}, {no_pub, no_del, ack}) ->
1349+
{undefined, -1};
1350+
1351+
%% Special case, missing del
1352+
%% See journal_minus_segment1/2
1353+
segment_plus_journal1({?PUB, no_del, no_ack}, {no_pub, no_del, ack}) ->
13451354
{undefined, -1}.
13461355

13471356
%% Remove from the journal entries for a segment, items that are
@@ -1413,6 +1422,16 @@ journal_minus_segment1({no_pub, no_del, ack}, {?PUB, del, no_ack}) ->
14131422
journal_minus_segment1({no_pub, no_del, ack}, {?PUB, del, ack}) ->
14141423
{undefined, -1};
14151424

1425+
%% Just ack in journal, missing del
1426+
%% Since 3.10 message delivery is tracked per-queue, not per-message,
1427+
%% but to keep queue index v1 format messages are always marked as
1428+
%% delivered on publish. But for a message that was published before
1429+
%% 3.10 this is not the case and the delivery marker can be missing.
1430+
%% As a workaround we add the del marker because if a message is acked
1431+
%% it must have been delivered as well.
1432+
journal_minus_segment1({no_pub, no_del, ack}, {?PUB, no_del, no_ack}) ->
1433+
{{no_pub, del, ack}, 0};
1434+
14161435
%% Deliver and ack in journal
14171436
journal_minus_segment1({no_pub, del, ack}, {?PUB, no_del, no_ack}) ->
14181437
{keep, 0};

deps/rabbit/test/classic_queue_prop_SUITE.erl

Lines changed: 243 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,10 @@ groups() ->
8282
classic_queue_v2
8383
]},
8484
{classic_queue_regressions, [], [
85-
reg_v1_full_recover_only_journal
85+
reg_v1_full_recover_only_journal,
86+
reg_v1_no_del_jif,
87+
reg_v1_no_del_idx,
88+
reg_v1_no_del_idx_unclean
8689
]}
8790
].
8891

@@ -1122,6 +1125,244 @@ do_reg_v1_full_recover_only_journal(Config) ->
11221125

11231126
Res15 = cmd_restart_vhost_clean(St14),
11241127
true = postcondition(St14, {call, undefined, cmd_restart_vhost_clean, [St14]}, Res15),
1125-
_ = next_state(St14, Res15, {call, undefined, cmd_restart_vhost_clean, [St14]}),
1128+
St15 = next_state(St14, Res15, {call, undefined, cmd_restart_vhost_clean, [St14]}),
1129+
1130+
cmd_teardown_queue(St15),
11261131

11271132
true.
1133+
1134+
%% The following reg_v1_no_del_* cases test when a classic queue has a
1135+
%% published message before an upgrade to 3.10. In that case there is
1136+
%% no delivery marker in the v1 queue index.
1137+
1138+
%% After upgrade to 3.10 there is a published message in the journal file.
1139+
%% Consuming and acknowledging the message should work fine.
1140+
reg_v1_no_del_jif(Config) ->
1141+
try
1142+
true = rabbit_ct_broker_helpers:rpc(
1143+
Config, 0, ?MODULE, do_reg_v1_no_del_jif, [Config])
1144+
catch exit:{exception, Reason} ->
1145+
exit(Reason)
1146+
end.
1147+
1148+
do_reg_v1_no_del_jif(Config) ->
1149+
St0 = #cq{name=prop_classic_queue_v1, version=1,
1150+
config=minimal_config(Config)},
1151+
1152+
Res1 = cmd_setup_queue(St0),
1153+
St3 = St0#cq{amq=Res1},
1154+
1155+
{St4, Ch} = cmd(cmd_channel_open, St3, []),
1156+
1157+
%% Simulate pre-3.10.0 behaviour by making deliver a noop
1158+
ok = meck:new(rabbit_queue_index, [passthrough]),
1159+
ok = meck:expect(rabbit_queue_index, deliver, fun(_, State) -> State end),
1160+
1161+
{St5, _Res5} = cmd(cmd_channel_publish, St4, [Ch, 4, _Persistent = 2, _NotMandatory = false, _NoExpiration = undefined]),
1162+
1163+
%% Enforce syncing journal to disk
1164+
%% (Not strictly necessary as vhost restart also triggers a sync)
1165+
%% At this point there should be a publish entry in the journal and no segment files
1166+
rabbit_amqqueue:pid_of(St5#cq.amq) ! timeout,
1167+
1168+
{SyncTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, sync, '_', 1000) end),
1169+
ct:pal("wait for sync took ~p ms", [SyncTime div 1000]),
1170+
1171+
%% Simulate RabbitMQ version upgrade by a clean vhost restart
1172+
%% (also reset delivery to normal operation)
1173+
ok = meck:delete(rabbit_queue_index, deliver, 2),
1174+
{St10, _} = cmd(cmd_restart_vhost_clean, St5, []),
1175+
1176+
meck:reset(rabbit_queue_index),
1177+
1178+
%% Consume the message and acknowledge it
1179+
%% The queue index should not crash when finding a pub+ack but no_del in the journal
1180+
%% (It used to crash in `action_to_entry/3' with a case_clause)
1181+
{St6, _Tag} = cmd(cmd_channel_consume, St10, [Ch]),
1182+
receive SomeMsg -> self() ! SomeMsg
1183+
after 5000 -> ct:fail(no_message_consumed)
1184+
end,
1185+
{St7, _Msg = #amqp_msg{}} = cmd(cmd_channel_receive_and_ack, St6, [Ch]),
1186+
1187+
%% enforce syncing journal to disk
1188+
rabbit_amqqueue:pid_of(St7#cq.amq) ! timeout,
1189+
1190+
{SyncTime2, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, sync, '_', 1000) end),
1191+
ct:pal("wait for sync took ~p ms", [SyncTime2 div 1000]),
1192+
1193+
validate_and_teaddown(St7).
1194+
1195+
%% After upgrade to 3.10 there is a published message in a segment file.
1196+
%% Consuming and acknowledging the message inserts an ack entry in the journal file.
1197+
%% A subsequent restart (of the queue/vhost/node) should work fine.
1198+
reg_v1_no_del_idx(Config) ->
1199+
try
1200+
true = rabbit_ct_broker_helpers:rpc(
1201+
Config, 0, ?MODULE, do_reg_v1_no_del_idx, [Config])
1202+
catch exit:{exception, Reason} ->
1203+
exit(Reason)
1204+
end.
1205+
1206+
do_reg_v1_no_del_idx(Config) ->
1207+
St0 = #cq{name=prop_classic_queue_v1, version=1,
1208+
config=minimal_config(Config)},
1209+
1210+
Res1 = cmd_setup_queue(St0),
1211+
St3 = St0#cq{amq=Res1},
1212+
1213+
{St4, Ch} = cmd(cmd_channel_open, St3, []),
1214+
1215+
%% Simulate pre-3.10.0 behaviour by making deliver a noop
1216+
ok = meck:new(rabbit_queue_index, [passthrough]),
1217+
ok = meck:expect(rabbit_queue_index, deliver, fun(_, State) -> State end),
1218+
1219+
ok = meck:new(rabbit_variable_queue, [passthrough]),
1220+
1221+
{St5, _Res5} = cmd(cmd_channel_publish, St4, [Ch, 4, _Persistent = 2, _NotMandatory = false, _NoExpiration = undefined]),
1222+
1223+
%% Wait for the queue process to get hibernated
1224+
%% handle_pre_hibernate syncs and flushes the journal
1225+
%% At this point there should be a publish entry in the segment file and an empty journal
1226+
{Time, ok} = timer:tc(fun() -> meck:wait(rabbit_variable_queue, handle_pre_hibernate, '_', 10000) end),
1227+
ct:pal("wait for hibernate took ~p ms", [Time div 1000]),
1228+
ok = meck:unload(rabbit_variable_queue),
1229+
1230+
%% Simulate RabbitMQ version upgrade by a clean vhost restart
1231+
%% (also reset delivery to normal operation)
1232+
ok = meck:delete(rabbit_queue_index, deliver, 2),
1233+
{St10, _} = cmd(cmd_restart_vhost_clean, St5, []),
1234+
1235+
%% Consume the message and acknowledge it
1236+
{St6, _Tag} = cmd(cmd_channel_consume, St10, [Ch]),
1237+
receive SomeMsg -> self() ! SomeMsg
1238+
after 5000 -> ct:fail(no_message_consumed)
1239+
end,
1240+
{St7, _Msg = #amqp_msg{}} = cmd(cmd_channel_receive_and_ack, St6, [Ch]),
1241+
1242+
meck:reset(rabbit_queue_index),
1243+
1244+
%% enforce syncing journal to disk
1245+
%% At this point there should be a publish entry in the segment file and an ack in the journal
1246+
rabbit_amqqueue:pid_of(St7#cq.amq) ! timeout,
1247+
{SyncTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, sync, '_', 1000) end),
1248+
ct:pal("wait for sync took ~p ms", [SyncTime div 1000]),
1249+
1250+
meck:reset(rabbit_queue_index),
1251+
1252+
%% Another clean vhost restart
1253+
%% The queue index should not crash when finding a pub in a
1254+
%% segment, an ack in the journal, but no_del
1255+
%% (It used to crash in `segment_plus_journal1/2' with a function_clause)
1256+
catch cmd(cmd_restart_vhost_clean, St7, []),
1257+
1258+
{ReadTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, read, '_', 1000) end),
1259+
ct:pal("wait for queue read took ~p ms", [ReadTime div 1000]),
1260+
1261+
validate_and_teaddown(St7).
1262+
1263+
%% After upgrade to 3.10 there is a published message in a segment file.
1264+
%% Consuming and acknowledging the message inserts an ack entry in the journal file.
1265+
%% The recovery after a subsequent unclean shutdown (of the queue/vhost/node) should work fine.
1266+
reg_v1_no_del_idx_unclean(Config) ->
1267+
try
1268+
true = rabbit_ct_broker_helpers:rpc(
1269+
Config, 0, ?MODULE, do_reg_v1_no_del_idx_unclean, [Config])
1270+
catch exit:{exception, Reason} ->
1271+
exit(Reason)
1272+
end.
1273+
1274+
do_reg_v1_no_del_idx_unclean(Config) ->
1275+
St0 = #cq{name=prop_classic_queue_v1, version=1,
1276+
config=minimal_config(Config)},
1277+
1278+
Res1 = cmd_setup_queue(St0),
1279+
St3 = St0#cq{amq=Res1},
1280+
1281+
{St4, Ch} = cmd(cmd_channel_open, St3, []),
1282+
1283+
%% Simulate pre-3.10.0 behaviour by making deliver a noop
1284+
ok = meck:new(rabbit_queue_index, [passthrough]),
1285+
ok = meck:expect(rabbit_queue_index, deliver, fun(_, State) -> State end),
1286+
1287+
ok = meck:new(rabbit_variable_queue, [passthrough]),
1288+
1289+
{St5, _Res5} = cmd(cmd_channel_publish, St4, [Ch, 4, _Persistent = 2, _NotMandatory = false, _NoExpiration = undefined]),
1290+
1291+
%% Wait for the queue process to get hibernated
1292+
%% handle_pre_hibernate syncs and flushes the journal
1293+
%% At this point there should be a publish entry in the segment file and an empty journal
1294+
{Time, ok} = timer:tc(fun() -> meck:wait(rabbit_variable_queue, handle_pre_hibernate, '_', 10000) end),
1295+
ct:pal("wait for hibernate took ~p ms", [Time div 1000]),
1296+
ok = meck:unload(rabbit_variable_queue),
1297+
1298+
%% Simulate RabbitMQ version upgrade by a clean vhost restart
1299+
%% (also reset delivery to normal operation)
1300+
ok = meck:delete(rabbit_queue_index, deliver, 2),
1301+
{St10, _} = cmd(cmd_restart_vhost_clean, St5, []),
1302+
1303+
%% Consume the message and acknowledge it
1304+
{St6, _Tag} = cmd(cmd_channel_consume, St10, [Ch]),
1305+
receive SomeMsg -> self() ! SomeMsg
1306+
after 5000 -> ct:fail(no_message_consumed)
1307+
end,
1308+
meck:reset(rabbit_queue_index),
1309+
{St7, _Msg = #amqp_msg{}} = cmd(cmd_channel_receive_and_ack, St6, [Ch]),
1310+
1311+
%% (need to ensure that the queue processed the ack before triggering the sync)
1312+
{AckTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, ack, '_', 1000) end),
1313+
ct:pal("wait for ack took ~p ms", [AckTime div 1000]),
1314+
1315+
%% enforce syncing journal to disk
1316+
%% At this point there should be a publish entry in the segment file and an ack in the journal
1317+
rabbit_amqqueue:pid_of(St7#cq.amq) ! timeout,
1318+
{SyncTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, sync, '_', 1000) end),
1319+
ct:pal("wait for sync took ~p ms", [SyncTime div 1000]),
1320+
1321+
meck:reset(rabbit_queue_index),
1322+
1323+
%% Recovery after unclean queue shutdown
1324+
%% The queue index should not crash when finding a pub in a
1325+
%% segment, an ack in the journal, but no_del
1326+
%% (It used to crash in `journal_minus_segment1/2' with a function_clause)
1327+
{St20, _} = cmd(cmd_restart_queue_dirty, St7, []),
1328+
1329+
{RecoverTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, recover, '_', 1000) end),
1330+
ct:pal("wait for queue recover took ~p ms", [RecoverTime div 1000]),
1331+
1332+
validate_and_teaddown(St20).
1333+
1334+
cmd(CmdName, StIn, ExtraArgs) ->
1335+
Res0 = apply(?MODULE, CmdName, [StIn | ExtraArgs]),
1336+
true = postcondition(StIn, {call, undefined, CmdName, [StIn | ExtraArgs]}, Res0),
1337+
StOut = next_state(StIn, Res0, {call, undefined, CmdName, [StIn | ExtraArgs]}),
1338+
{StOut, Res0}.
1339+
1340+
validate_and_teaddown(St) ->
1341+
try
1342+
case meck:validate(rabbit_queue_index) of
1343+
true ->
1344+
true;
1345+
false ->
1346+
FailedCalls =
1347+
[Hist || Hist = {_CallerPid, _MFA, _Class, _Reason, _ST}
1348+
<- meck:history(rabbit_queue_index)],
1349+
ct:pal("Failed call(s) to rabbit_queue_index:~n~p", [FailedCalls]),
1350+
1351+
{_, _, _, _, [{_M, F, _A, _Loc}|_]} = hd(FailedCalls),
1352+
ct:fail({queue_index_crashed, F})
1353+
end
1354+
after
1355+
ok = meck:unload(rabbit_queue_index),
1356+
safe_teardown_queue(St)
1357+
end.
1358+
1359+
safe_teardown_queue(St) ->
1360+
try cmd_teardown_queue(St)
1361+
catch _:_ ->
1362+
%% It is possible that asking a queue process in cyclic
1363+
%% crashing to stop fails.
1364+
VHostDir = rabbit_vhost:msg_store_dir_path(<<"/">>),
1365+
[ok = file:delete(QIFile)
1366+
|| QIFile <- filelib:wildcard(filename:join(VHostDir, "queues/*/*"))],
1367+
cmd_teardown_queue(St)
1368+
end.

0 commit comments

Comments
 (0)