Skip to content

Commit 46d4279

Browse files
Merge pull request #6090 from rabbitmq/lh-msg-store
CQ shared message store improvements
2 parents a59c2b5 + 59259b2 commit 46d4279

9 files changed

+994
-1139
lines changed

deps/rabbit/src/rabbit_msg_file.erl

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
-module(rabbit_msg_file).
99

10-
-export([append/3, read/2, scan/4]).
10+
-export([append/3, read/2, pread/2, pread/3, scan/4]).
1111

1212
%%----------------------------------------------------------------------------
1313

@@ -39,6 +39,9 @@
3939

4040
append(FileHdl, MsgId, MsgBody)
4141
when is_binary(MsgId) andalso size(MsgId) =:= ?MSG_ID_SIZE_BYTES ->
42+
%% @todo I think we are actually writing MsgId TWICE: once in
43+
%% the header, once in the body. Might be better to reduce
44+
%% the size of the body...
4245
MsgBodyBin = term_to_binary(MsgBody),
4346
MsgBodyBinSize = size(MsgBodyBin),
4447
Size = MsgBodyBinSize + ?MSG_ID_SIZE_BYTES,
@@ -67,6 +70,44 @@ read(FileHdl, TotalSize) ->
6770
KO -> KO
6871
end.
6972

73+
-spec pread(io_device(), position(), msg_size()) ->
74+
rabbit_types:ok_or_error2({rabbit_types:msg_id(), msg()},
75+
any()).
76+
77+
pread(FileHdl, Offset, TotalSize) ->
78+
Size = TotalSize - ?FILE_PACKING_ADJUSTMENT,
79+
BodyBinSize = Size - ?MSG_ID_SIZE_BYTES,
80+
case file:pread(FileHdl, Offset, TotalSize) of
81+
{ok, <<Size:?INTEGER_SIZE_BITS,
82+
MsgId:?MSG_ID_SIZE_BYTES/binary,
83+
MsgBodyBin:BodyBinSize/binary,
84+
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} ->
85+
{ok, {MsgId, binary_to_term(MsgBodyBin)}};
86+
KO -> KO
87+
end.
88+
89+
-spec pread(io_device(), [{position(), msg_size()}]) ->
90+
{ok, [msg()]} | {error, any()} | eof.
91+
92+
pread(FileHdl, LocNums) ->
93+
case file:pread(FileHdl, LocNums) of
94+
{ok, DataL} -> {ok, pread_parse(DataL)};
95+
KO -> KO
96+
end.
97+
98+
pread_parse([<<Size:?INTEGER_SIZE_BITS,
99+
_MsgId:?MSG_ID_SIZE_BYTES/binary,
100+
Rest0/bits>>|Tail]) ->
101+
BodyBinSize = Size - ?MSG_ID_SIZE_BYTES,
102+
<<MsgBodyBin:BodyBinSize/binary,
103+
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS,
104+
Rest/bits>> = Rest0,
105+
[binary_to_term(MsgBodyBin)|pread_parse([Rest|Tail])];
106+
pread_parse([<<>>]) ->
107+
[];
108+
pread_parse([<<>>|Tail]) ->
109+
pread_parse(Tail).
110+
70111
-spec scan(io_device(), file_size(), message_accumulator(A), A) ->
71112
{'ok', A, position()}.
72113

deps/rabbit/src/rabbit_msg_store.erl

Lines changed: 646 additions & 964 deletions
Large diffs are not rendered by default.

deps/rabbit/src/rabbit_msg_store_ets_index.erl

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
-behaviour(rabbit_msg_store_index).
1313

1414
-export([new/1, recover/1,
15-
lookup/2, insert/2, update/2, update_fields/3, delete/2,
15+
lookup/2, select_from_file/3, select_all_from_file/2, insert/2, update/2, update_fields/3, delete/2,
1616
delete_object/2, clean_up_temporary_reference_count_entries_without_file/1, terminate/1]).
1717

1818
-define(MSG_LOC_NAME, rabbit_msg_store_ets_index).
@@ -42,6 +42,18 @@ lookup(Key, State) ->
4242
[Entry] -> Entry
4343
end.
4444

45+
%% @todo We currently fetch all and then filter by file.
46+
%% This might lead to too many lookups... How to best
47+
%% optimize this? ets:select didn't seem great.
48+
select_from_file(MsgIds, File, State) ->
49+
All = [lookup(Id, State) || Id <- MsgIds],
50+
[MsgLoc || MsgLoc=#msg_location{file=MsgFile} <- All, MsgFile =:= File].
51+
52+
%% Note that this function is not terribly efficient and should only be
53+
%% used for compaction or similar.
54+
select_all_from_file(File, State) ->
55+
ets:match_object(State #state.table, #msg_location { file = File, _ = '_' }).
56+
4557
insert(Obj, State) ->
4658
true = ets:insert_new(State #state.table, Obj),
4759
ok.

deps/rabbit/src/rabbit_msg_store_gc.erl

Lines changed: 64 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@
99

1010
-behaviour(gen_server2).
1111

12-
-export([start_link/1, combine/3, delete/2, no_readers/2, stop/1]).
12+
-export([start_link/1, compact/2, truncate/4, delete/2, stop/1]).
1313

1414
-export([set_maximum_since_use/2]).
1515

1616
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
1717
terminate/2, code_change/3, prioritise_cast/3]).
1818

1919
-record(state,
20-
{ pending_no_readers,
21-
on_action,
20+
{ pending,
21+
timer_ref,
2222
msg_store_state
2323
}).
2424

@@ -33,22 +33,21 @@ start_link(MsgStoreState) ->
3333
gen_server2:start_link(?MODULE, [MsgStoreState],
3434
[{timeout, infinity}]).
3535

36-
-spec combine(pid(), rabbit_msg_store:file_num(),
37-
rabbit_msg_store:file_num()) -> 'ok'.
36+
-spec compact(pid(), rabbit_msg_store:file_num()) -> 'ok'.
3837

39-
combine(Server, Source, Destination) ->
40-
gen_server2:cast(Server, {combine, Source, Destination}).
38+
compact(Server, File) ->
39+
gen_server2:cast(Server, {compact, File}).
40+
41+
-spec truncate(pid(), rabbit_msg_store:file_num(), non_neg_integer(), integer()) -> 'ok'.
42+
43+
truncate(Server, File, TruncateSize, ThresholdTimestamp) ->
44+
gen_server2:cast(Server, {truncate, File, TruncateSize, ThresholdTimestamp}).
4145

4246
-spec delete(pid(), rabbit_msg_store:file_num()) -> 'ok'.
4347

4448
delete(Server, File) ->
4549
gen_server2:cast(Server, {delete, File}).
4650

47-
-spec no_readers(pid(), rabbit_msg_store:file_num()) -> 'ok'.
48-
49-
no_readers(Server, File) ->
50-
gen_server2:cast(Server, {no_readers, File}).
51-
5251
-spec stop(pid()) -> 'ok'.
5352

5453
stop(Server) ->
@@ -64,8 +63,7 @@ set_maximum_since_use(Pid, Age) ->
6463
init([MsgStoreState]) ->
6564
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
6665
[self()]),
67-
{ok, #state { pending_no_readers = #{},
68-
on_action = [],
66+
{ok, #state { pending = #{},
6967
msg_store_state = MsgStoreState }, hibernate,
7068
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
7169

@@ -75,28 +73,44 @@ prioritise_cast(_Msg, _Len, _State) -> 0.
7573
handle_call(stop, _From, State) ->
7674
{stop, normal, ok, State}.
7775

78-
handle_cast({combine, Source, Destination}, State) ->
79-
{noreply, attempt_action(combine, [Source, Destination], State), hibernate};
80-
81-
handle_cast({delete, File}, State) ->
82-
{noreply, attempt_action(delete, [File], State), hibernate};
83-
84-
handle_cast({no_readers, File},
85-
State = #state { pending_no_readers = Pending }) ->
86-
{noreply, case maps:find(File, Pending) of
87-
error ->
88-
State;
89-
{ok, {Action, Files}} ->
90-
Pending1 = maps:remove(File, Pending),
91-
attempt_action(
92-
Action, Files,
93-
State #state { pending_no_readers = Pending1 })
94-
end, hibernate};
76+
handle_cast({compact, File}, State) ->
77+
%% Since we don't compact files that have a valid size of 0,
78+
%% we cannot have a delete queued at the same time as we are
79+
%% asked to compact. We can always compact.
80+
{noreply, attempt_action(compact, [File], State), hibernate};
81+
82+
handle_cast({truncate, File, TruncateSize, ThresholdTimestamp}, State = #state{pending = Pending}) ->
83+
case Pending of
84+
%% No need to truncate if we are going to delete.
85+
#{File := {delete, _}} ->
86+
{noreply, State, hibernate};
87+
%% Attempt to truncate otherwise. If a truncate was already
88+
%% scheduled we drop it in favor of the new truncate.
89+
_ ->
90+
State1 = State#state{pending = maps:remove(File, Pending)},
91+
{noreply, attempt_action(truncate, [File, TruncateSize, ThresholdTimestamp], State1), hibernate}
92+
end;
93+
94+
handle_cast({delete, File}, State = #state{pending = Pending}) ->
95+
%% We drop any pending action because deletion takes precedence over truncation.
96+
State1 = State#state{pending = maps:remove(File, Pending)},
97+
{noreply, attempt_action(delete, [File], State1), hibernate};
9598

9699
handle_cast({set_maximum_since_use, Age}, State) ->
97100
ok = file_handle_cache:set_maximum_since_use(Age),
98101
{noreply, State, hibernate}.
99102

103+
%% Run all pending actions.
104+
handle_info({timeout, TimerRef, do_pending},
105+
State = #state{ pending = Pending,
106+
timer_ref = TimerRef }) ->
107+
State1 = State#state{ pending = #{},
108+
timer_ref = undefined },
109+
State2 = maps:fold(fun(_File, {Action, Args}, StateFold) ->
110+
attempt_action(Action, Args, StateFold)
111+
end, State1, Pending),
112+
{noreply, State2, hibernate};
113+
100114
handle_info(Info, State) ->
101115
{stop, {unhandled_info, Info}, State}.
102116

@@ -106,20 +120,27 @@ terminate(_Reason, State) ->
106120
code_change(_OldVsn, State, _Extra) ->
107121
{ok, State}.
108122

109-
attempt_action(Action, Files,
110-
State = #state { pending_no_readers = Pending,
111-
on_action = Thunks,
123+
attempt_action(Action, Args,
124+
State = #state { pending = Pending,
112125
msg_store_state = MsgStoreState }) ->
113-
case do_action(Action, Files, MsgStoreState) of
114-
{ok, OkThunk} ->
115-
State#state{on_action = lists:filter(fun (Thunk) -> not Thunk() end,
116-
[OkThunk | Thunks])};
117-
{defer, [File | _]} ->
118-
Pending1 = maps:put(File, {Action, Files}, Pending),
119-
State #state { pending_no_readers = Pending1 }
126+
case do_action(Action, Args, MsgStoreState) of
127+
ok ->
128+
State;
129+
defer ->
130+
[File|_] = Args,
131+
Pending1 = maps:put(File, {Action, Args}, Pending),
132+
ensure_pending_timer(State #state { pending = Pending1 })
120133
end.
121134

122-
do_action(combine, [Source, Destination], MsgStoreState) ->
123-
rabbit_msg_store:combine_files(Source, Destination, MsgStoreState);
135+
do_action(compact, [File], MsgStoreState) ->
136+
rabbit_msg_store:compact_file(File, MsgStoreState);
137+
do_action(truncate, [File, Size, ThresholdTimestamp], MsgStoreState) ->
138+
rabbit_msg_store:truncate_file(File, Size, ThresholdTimestamp, MsgStoreState);
124139
do_action(delete, [File], MsgStoreState) ->
125140
rabbit_msg_store:delete_file(File, MsgStoreState).
141+
142+
ensure_pending_timer(State = #state{timer_ref = undefined}) ->
143+
TimerRef = erlang:start_timer(5000, self(), do_pending),
144+
State#state{timer_ref = TimerRef};
145+
ensure_pending_timer(State) ->
146+
State.

0 commit comments

Comments
 (0)