Skip to content

Commit 19aa8d4

Browse files
Merge pull request #8458 from rabbitmq/mergify/bp/v3.11.x/pr-8457
Handle missing delivery marker in CQ v1 index (backport #8453) (backport #8457)
2 parents 43c5d16 + 8358b22 commit 19aa8d4

File tree

3 files changed

+263
-3
lines changed

3 files changed

+263
-3
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ rabbitmq_integration_suite(
309309
rabbitmq_integration_suite(
310310
name = "classic_queue_prop_SUITE",
311311
size = "large",
312-
shard_count = 5,
312+
shard_count = 8,
313313
sharding_method = "case",
314314
deps = [
315315
"@proper//:erlang_app",

deps/rabbit/src/rabbit_queue_index.erl

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -966,6 +966,10 @@ action_to_entry(RelSeq, Action, JEntries) ->
966966
({no_pub, del, no_ack}) when Action == ack ->
967967
{set, {no_pub, del, ack}};
968968
({?PUB, del, no_ack}) when Action == ack ->
969+
{reset, none};
970+
%% Special case, missing del
971+
%% See journal_minus_segment1/2
972+
({?PUB, no_del, no_ack}) when Action == ack ->
969973
{reset, none}
970974
end.
971975

@@ -1356,6 +1360,11 @@ segment_plus_journal1({?PUB = Pub, no_del, no_ack}, {no_pub, del, no_ack}) ->
13561360
segment_plus_journal1({?PUB, no_del, no_ack}, {no_pub, del, ack}) ->
13571361
{undefined, -1};
13581362
segment_plus_journal1({?PUB, del, no_ack}, {no_pub, no_del, ack}) ->
1363+
{undefined, -1};
1364+
1365+
%% Special case, missing del
1366+
%% See journal_minus_segment1/2
1367+
segment_plus_journal1({?PUB, no_del, no_ack}, {no_pub, no_del, ack}) ->
13591368
{undefined, -1}.
13601369

13611370
%% Remove from the journal entries for a segment, items that are
@@ -1427,6 +1436,16 @@ journal_minus_segment1({no_pub, no_del, ack}, {?PUB, del, no_ack}) ->
14271436
journal_minus_segment1({no_pub, no_del, ack}, {?PUB, del, ack}) ->
14281437
{undefined, -1};
14291438

1439+
%% Just ack in journal, missing del
1440+
%% Since 3.10 message delivery is tracked per-queue, not per-message,
1441+
%% but to keep queue index v1 format messages are always marked as
1442+
%% delivered on publish. But for a message that was published before
1443+
%% 3.10 this is not the case and the delivery marker can be missing.
1444+
%% As a workaround we add the del marker because if a message is acked
1445+
%% it must have been delivered as well.
1446+
journal_minus_segment1({no_pub, no_del, ack}, {?PUB, no_del, no_ack}) ->
1447+
{{no_pub, del, ack}, 0};
1448+
14301449
%% Deliver and ack in journal
14311450
journal_minus_segment1({no_pub, del, ack}, {?PUB, no_del, no_ack}) ->
14321451
{keep, 0};

deps/rabbit/test/classic_queue_prop_SUITE.erl

Lines changed: 243 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,10 @@ groups() ->
8585
lazy_queue_v2
8686
]},
8787
{classic_queue_regressions, [], [
88-
reg_v1_full_recover_only_journal
88+
reg_v1_full_recover_only_journal,
89+
reg_v1_no_del_jif,
90+
reg_v1_no_del_idx,
91+
reg_v1_no_del_idx_unclean
8992
]}
9093
].
9194

@@ -1191,6 +1194,244 @@ do_reg_v1_full_recover_only_journal(Config) ->
11911194

11921195
Res15 = cmd_restart_vhost_clean(St14),
11931196
true = postcondition(St14, {call, undefined, cmd_restart_vhost_clean, [St14]}, Res15),
1194-
_ = next_state(St14, Res15, {call, undefined, cmd_restart_vhost_clean, [St14]}),
1197+
St15 = next_state(St14, Res15, {call, undefined, cmd_restart_vhost_clean, [St14]}),
1198+
1199+
cmd_teardown_queue(St15),
11951200

11961201
true.
1202+
1203+
%% The following reg_v1_no_del_* cases test when a classic queue has a
1204+
%% published message before an upgrade to 3.10. In that case there is
1205+
%% no delivery marker in the v1 queue index.
1206+
1207+
%% After upgrade to 3.10 there is a published message in the journal file.
1208+
%% Consuming and acknowledging the message should work fine.
1209+
reg_v1_no_del_jif(Config) ->
1210+
try
1211+
true = rabbit_ct_broker_helpers:rpc(
1212+
Config, 0, ?MODULE, do_reg_v1_no_del_jif, [Config])
1213+
catch exit:{exception, Reason} ->
1214+
exit(Reason)
1215+
end.
1216+
1217+
do_reg_v1_no_del_jif(Config) ->
1218+
St0 = #cq{name=prop_classic_queue_v1, mode=lazy, version=1,
1219+
config=minimal_config(Config)},
1220+
1221+
Res1 = cmd_setup_queue(St0),
1222+
St3 = St0#cq{amq=Res1},
1223+
1224+
{St4, Ch} = cmd(cmd_channel_open, St3, []),
1225+
1226+
%% Simulate pre-3.10.0 behaviour by making deliver a noop
1227+
ok = meck:new(rabbit_queue_index, [passthrough]),
1228+
ok = meck:expect(rabbit_queue_index, deliver, fun(_, State) -> State end),
1229+
1230+
{St5, _Res5} = cmd(cmd_channel_publish, St4, [Ch, 4, _Persistent = 2, _NotMandatory = false, _NoExpiration = undefined]),
1231+
1232+
%% Enforce syncing journal to disk
1233+
%% (Not strictly necessary as vhost restart also triggers a sync)
1234+
%% At this point there should be a publish entry in the journal and no segment files
1235+
rabbit_amqqueue:pid_of(St5#cq.amq) ! timeout,
1236+
1237+
{SyncTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, sync, '_', 1000) end),
1238+
ct:pal("wait for sync took ~p ms", [SyncTime div 1000]),
1239+
1240+
%% Simulate RabbitMQ version upgrade by a clean vhost restart
1241+
%% (also reset delivery to normal operation)
1242+
ok = meck:delete(rabbit_queue_index, deliver, 2),
1243+
{St10, _} = cmd(cmd_restart_vhost_clean, St5, []),
1244+
1245+
meck:reset(rabbit_queue_index),
1246+
1247+
%% Consume the message and acknowledge it
1248+
%% The queue index should not crash when finding a pub+ack but no_del in the journal
1249+
%% (It used to crash in `action_to_entry/3' with a case_clause)
1250+
{St6, _Tag} = cmd(cmd_channel_consume, St10, [Ch]),
1251+
receive SomeMsg -> self() ! SomeMsg
1252+
after 5000 -> ct:fail(no_message_consumed)
1253+
end,
1254+
{St7, _Msg = #amqp_msg{}} = cmd(cmd_channel_receive_and_ack, St6, [Ch]),
1255+
1256+
%% enforce syncing journal to disk
1257+
rabbit_amqqueue:pid_of(St7#cq.amq) ! timeout,
1258+
1259+
{SyncTime2, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, sync, '_', 1000) end),
1260+
ct:pal("wait for sync took ~p ms", [SyncTime2 div 1000]),
1261+
1262+
validate_and_teaddown(St7).
1263+
1264+
%% After upgrade to 3.10 there is a published message in a segment file.
1265+
%% Consuming and acknowledging the message inserts an ack entry in the journal file.
1266+
%% A subsequent restart (of the queue/vhost/node) should work fine.
1267+
reg_v1_no_del_idx(Config) ->
1268+
try
1269+
true = rabbit_ct_broker_helpers:rpc(
1270+
Config, 0, ?MODULE, do_reg_v1_no_del_idx, [Config])
1271+
catch exit:{exception, Reason} ->
1272+
exit(Reason)
1273+
end.
1274+
1275+
do_reg_v1_no_del_idx(Config) ->
1276+
St0 = #cq{name=prop_classic_queue_v1, mode=lazy, version=1,
1277+
config=minimal_config(Config)},
1278+
1279+
Res1 = cmd_setup_queue(St0),
1280+
St3 = St0#cq{amq=Res1},
1281+
1282+
{St4, Ch} = cmd(cmd_channel_open, St3, []),
1283+
1284+
%% Simulate pre-3.10.0 behaviour by making deliver a noop
1285+
ok = meck:new(rabbit_queue_index, [passthrough]),
1286+
ok = meck:expect(rabbit_queue_index, deliver, fun(_, State) -> State end),
1287+
1288+
ok = meck:new(rabbit_variable_queue, [passthrough]),
1289+
1290+
{St5, _Res5} = cmd(cmd_channel_publish, St4, [Ch, 4, _Persistent = 2, _NotMandatory = false, _NoExpiration = undefined]),
1291+
1292+
%% Wait for the queue process to get hibernated
1293+
%% handle_pre_hibernate syncs and flushes the journal
1294+
%% At this point there should be a publish entry in the segment file and an empty journal
1295+
{Time, ok} = timer:tc(fun() -> meck:wait(rabbit_variable_queue, handle_pre_hibernate, '_', 10000) end),
1296+
ct:pal("wait for hibernate took ~p ms", [Time div 1000]),
1297+
ok = meck:unload(rabbit_variable_queue),
1298+
1299+
%% Simulate RabbitMQ version upgrade by a clean vhost restart
1300+
%% (also reset delivery to normal operation)
1301+
ok = meck:delete(rabbit_queue_index, deliver, 2),
1302+
{St10, _} = cmd(cmd_restart_vhost_clean, St5, []),
1303+
1304+
%% Consume the message and acknowledge it
1305+
{St6, _Tag} = cmd(cmd_channel_consume, St10, [Ch]),
1306+
receive SomeMsg -> self() ! SomeMsg
1307+
after 5000 -> ct:fail(no_message_consumed)
1308+
end,
1309+
{St7, _Msg = #amqp_msg{}} = cmd(cmd_channel_receive_and_ack, St6, [Ch]),
1310+
1311+
meck:reset(rabbit_queue_index),
1312+
1313+
%% enforce syncing journal to disk
1314+
%% At this point there should be a publish entry in the segment file and an ack in the journal
1315+
rabbit_amqqueue:pid_of(St7#cq.amq) ! timeout,
1316+
{SyncTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, sync, '_', 1000) end),
1317+
ct:pal("wait for sync took ~p ms", [SyncTime div 1000]),
1318+
1319+
meck:reset(rabbit_queue_index),
1320+
1321+
%% Another clean vhost restart
1322+
%% The queue index should not crash when finding a pub in a
1323+
%% segment, an ack in the journal, but no_del
1324+
%% (It used to crash in `segment_plus_journal1/2' with a function_clause)
1325+
catch cmd(cmd_restart_vhost_clean, St7, []),
1326+
1327+
{ReadTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, read, '_', 1000) end),
1328+
ct:pal("wait for queue read took ~p ms", [ReadTime div 1000]),
1329+
1330+
validate_and_teaddown(St7).
1331+
1332+
%% After upgrade to 3.10 there is a published message in a segment file.
1333+
%% Consuming and acknowledging the message inserts an ack entry in the journal file.
1334+
%% The recovery after a subsequent unclean shutdown (of the queue/vhost/node) should work fine.
1335+
reg_v1_no_del_idx_unclean(Config) ->
1336+
try
1337+
true = rabbit_ct_broker_helpers:rpc(
1338+
Config, 0, ?MODULE, do_reg_v1_no_del_idx_unclean, [Config])
1339+
catch exit:{exception, Reason} ->
1340+
exit(Reason)
1341+
end.
1342+
1343+
do_reg_v1_no_del_idx_unclean(Config) ->
1344+
St0 = #cq{name=prop_classic_queue_v1, mode=lazy, version=1,
1345+
config=minimal_config(Config)},
1346+
1347+
Res1 = cmd_setup_queue(St0),
1348+
St3 = St0#cq{amq=Res1},
1349+
1350+
{St4, Ch} = cmd(cmd_channel_open, St3, []),
1351+
1352+
%% Simulate pre-3.10.0 behaviour by making deliver a noop
1353+
ok = meck:new(rabbit_queue_index, [passthrough]),
1354+
ok = meck:expect(rabbit_queue_index, deliver, fun(_, State) -> State end),
1355+
1356+
ok = meck:new(rabbit_variable_queue, [passthrough]),
1357+
1358+
{St5, _Res5} = cmd(cmd_channel_publish, St4, [Ch, 4, _Persistent = 2, _NotMandatory = false, _NoExpiration = undefined]),
1359+
1360+
%% Wait for the queue process to get hibernated
1361+
%% handle_pre_hibernate syncs and flushes the journal
1362+
%% At this point there should be a publish entry in the segment file and an empty journal
1363+
{Time, ok} = timer:tc(fun() -> meck:wait(rabbit_variable_queue, handle_pre_hibernate, '_', 10000) end),
1364+
ct:pal("wait for hibernate took ~p ms", [Time div 1000]),
1365+
ok = meck:unload(rabbit_variable_queue),
1366+
1367+
%% Simulate RabbitMQ version upgrade by a clean vhost restart
1368+
%% (also reset delivery to normal operation)
1369+
ok = meck:delete(rabbit_queue_index, deliver, 2),
1370+
{St10, _} = cmd(cmd_restart_vhost_clean, St5, []),
1371+
1372+
%% Consume the message and acknowledge it
1373+
{St6, _Tag} = cmd(cmd_channel_consume, St10, [Ch]),
1374+
receive SomeMsg -> self() ! SomeMsg
1375+
after 5000 -> ct:fail(no_message_consumed)
1376+
end,
1377+
meck:reset(rabbit_queue_index),
1378+
{St7, _Msg = #amqp_msg{}} = cmd(cmd_channel_receive_and_ack, St6, [Ch]),
1379+
1380+
%% (need to ensure that the queue processed the ack before triggering the sync)
1381+
{AckTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, ack, '_', 1000) end),
1382+
ct:pal("wait for ack took ~p ms", [AckTime div 1000]),
1383+
1384+
%% enforce syncing journal to disk
1385+
%% At this point there should be a publish entry in the segment file and an ack in the journal
1386+
rabbit_amqqueue:pid_of(St7#cq.amq) ! timeout,
1387+
{SyncTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, sync, '_', 1000) end),
1388+
ct:pal("wait for sync took ~p ms", [SyncTime div 1000]),
1389+
1390+
meck:reset(rabbit_queue_index),
1391+
1392+
%% Recovery after unclean queue shutdown
1393+
%% The queue index should not crash when finding a pub in a
1394+
%% segment, an ack in the journal, but no_del
1395+
%% (It used to crash in `journal_minus_segment1/2' with a function_clause)
1396+
{St20, _} = cmd(cmd_restart_queue_dirty, St7, []),
1397+
1398+
{RecoverTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, recover, '_', 1000) end),
1399+
ct:pal("wait for queue recover took ~p ms", [RecoverTime div 1000]),
1400+
1401+
validate_and_teaddown(St20).
1402+
1403+
cmd(CmdName, StIn, ExtraArgs) ->
1404+
Res0 = apply(?MODULE, CmdName, [StIn | ExtraArgs]),
1405+
true = postcondition(StIn, {call, undefined, CmdName, [StIn | ExtraArgs]}, Res0),
1406+
StOut = next_state(StIn, Res0, {call, undefined, CmdName, [StIn | ExtraArgs]}),
1407+
{StOut, Res0}.
1408+
1409+
validate_and_teaddown(St) ->
1410+
try
1411+
case meck:validate(rabbit_queue_index) of
1412+
true ->
1413+
true;
1414+
false ->
1415+
FailedCalls =
1416+
[Hist || Hist = {_CallerPid, _MFA, _Class, _Reason, _ST}
1417+
<- meck:history(rabbit_queue_index)],
1418+
ct:pal("Failed call(s) to rabbit_queue_index:~n~p", [FailedCalls]),
1419+
1420+
{_, _, _, _, [{_M, F, _A, _Loc}|_]} = hd(FailedCalls),
1421+
ct:fail({queue_index_crashed, F})
1422+
end
1423+
after
1424+
ok = meck:unload(rabbit_queue_index),
1425+
safe_teardown_queue(St)
1426+
end.
1427+
1428+
safe_teardown_queue(St) ->
1429+
try cmd_teardown_queue(St)
1430+
catch _:_ ->
1431+
%% It is possible that asking a queue process in cyclic
1432+
%% crashing to stop fails.
1433+
VHostDir = rabbit_vhost:msg_store_dir_path(<<"/">>),
1434+
[ok = file:delete(QIFile)
1435+
|| QIFile <- filelib:wildcard(filename:join(VHostDir, "queues/*/*"))],
1436+
cmd_teardown_queue(St)
1437+
end.

0 commit comments

Comments
 (0)