Skip to content

Commit d8ec9ef

Browse files
acogoluegnesmergify[bot]
authored andcommitted
Rename StreamInfo to StreamStats
Other changes: returns a map of int64, use the new osiris:get_stats/1 API. References #5412 (cherry picked from commit 93c33f2)
1 parent 390309f commit d8ec9ef

File tree

7 files changed

+59
-66
lines changed

7 files changed

+59
-66
lines changed

deps/rabbitmq_stream/docs/PROTOCOL.adoc

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ used to make the difference between a request (0) and a response (1). Example fo
226226
|0x001b
227227
|Yes
228228

229-
|<<streaminfo>>
229+
|<<streamstats>>
230230
|Client
231231
|0x001c
232232
|Yes
@@ -375,7 +375,7 @@ Deliver => Key Version SubscriptionId CommittedOffset OsirisChunk
375375
Key => uint16 // 0x0008
376376
Version => uint16
377377
SubscriptionId => uint8
378-
CommittedOffset => uint64
378+
CommittedChunkId => uint64
379379
OsirisChunk => MagicVersion NumEntries NumRecords Epoch ChunkFirstOffset ChunkCrc DataLength Messages
380380
MagicVersion => int8
381381
ChunkType => int8 // 0: user, 1: tracking delta, 2: tracking snapshot
@@ -707,24 +707,24 @@ CommandVersionsExchangeResponse => Key Version CorrelationId ResponseCode [Comma
707707
MaxVersion => uint16
708708
```
709709

710-
=== StreamInfo
710+
=== StreamStats
711711

712712
```
713-
StreamInfoRequest => Key Version CorrelationId Stream
713+
StreamStatsRequest => Key Version CorrelationId Stream
714714
Key => uint16 // 0x001c
715715
Version => uint16
716716
CorrelationId => uint32
717717
Stream => string
718718

719-
StreamInfoResponse => Key Version CorrelationId ResponseCode Info
719+
StreamStatsResponse => Key Version CorrelationId ResponseCode Stats
720720
Key => uint16 // 0x801c
721721
Version => uint16
722722
CorrelationId => uint32
723723
ResponseCode => uint16
724-
Info => [PieceOfInfo]
725-
PieceOfInfo => Key Value
724+
Stats => [Statistic]
725+
Statistic => Key Value
726726
Key => string
727-
Value => string
727+
Value => int64
728728
```
729729

730730

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2646,7 +2646,7 @@ handle_frame_post_auth(Transport,
26462646
user = User} =
26472647
Connection,
26482648
State,
2649-
{request, CorrelationId, {stream_info, Stream}}) ->
2649+
{request, CorrelationId, {stream_stats, Stream}}) ->
26502650
QueueResource =
26512651
#resource{name = Stream,
26522652
kind = queue,
@@ -2661,38 +2661,28 @@ handle_frame_post_auth(Transport,
26612661
rabbit_global_counters:increase_protocol_counter(stream,
26622662
?STREAM_NOT_AVAILABLE,
26632663
1),
2664-
{stream_info, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE, #{}};
2664+
{stream_stats, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE,
2665+
#{}};
26652666
{error, not_found} ->
26662667
rabbit_global_counters:increase_protocol_counter(stream,
26672668
?STREAM_DOES_NOT_EXIST,
26682669
1),
2669-
{stream_info, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST,
2670+
{stream_stats, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST,
26702671
#{}};
26712672
{ok, MemberPid} ->
2672-
OffsetInfo =
2673-
case gen:call(MemberPid, '$gen_call',
2674-
get_reader_context)
2675-
of
2676-
{ok, #{offset_ref := undefined}} ->
2677-
#{};
2678-
{ok, #{offset_ref := OffsetRef}} ->
2679-
#{<<"first_offset">> =>
2680-
rabbit_data_coercion:to_binary(
2681-
atomics:get(OffsetRef, 2)),
2682-
<<"committed_offset">> =>
2683-
rabbit_data_coercion:to_binary(
2684-
atomics:get(OffsetRef, 1))};
2685-
_ ->
2686-
#{}
2687-
end,
2688-
2689-
{stream_info, ?RESPONSE_CODE_OK, OffsetInfo}
2673+
StreamStats =
2674+
maps:fold(fun(K, V, Acc) ->
2675+
Acc#{rabbit_data_coercion:to_binary(K)
2676+
=> V}
2677+
end,
2678+
#{}, osiris:get_stats(MemberPid)),
2679+
{stream_stats, ?RESPONSE_CODE_OK, StreamStats}
26902680
end;
26912681
error ->
26922682
rabbit_global_counters:increase_protocol_counter(stream,
26932683
?ACCESS_REFUSED,
26942684
1),
2695-
{stream_info, ?RESPONSE_CODE_ACCESS_REFUSED, #{}}
2685+
{stream_stats, ?RESPONSE_CODE_ACCESS_REFUSED, #{}}
26962686
end,
26972687
Frame = rabbit_stream_core:frame({response, CorrelationId, Response}),
26982688
send(Transport, S, Frame),
@@ -3243,7 +3233,7 @@ send_file_callback(?VERSION_2,
32433233
fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries},
32443234
Size) ->
32453235
FrameSize = 2 + 2 + 1 + 8 + Size,
3246-
CommittedOffset =
3236+
CommittedChunkId =
32473237
case osiris_log:committed_offset(Log) of
32483238
undefined -> 0;
32493239
R -> R
@@ -3254,7 +3244,7 @@ send_file_callback(?VERSION_2,
32543244
?COMMAND_DELIVER:15,
32553245
?VERSION_2:16,
32563246
SubscriptionId:8/unsigned,
3257-
CommittedOffset:64>>,
3247+
CommittedChunkId:64>>,
32583248
Transport:send(S, FrameBeginning),
32593249
atomics:add(Counter, 1, Size),
32603250
increase_messages_consumed(Counters, NumEntries),

deps/rabbitmq_stream/src/rabbit_stream_utils.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,4 +276,4 @@ command_versions() ->
276276
{heartbeat, ?VERSION_1, ?VERSION_1},
277277
{route, ?VERSION_1, ?VERSION_1},
278278
{partitions, ?VERSION_1, ?VERSION_1},
279-
{stream_info, ?VERSION_1, ?VERSION_1}].
279+
{stream_stats, ?VERSION_1, ?VERSION_1}].

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ test_server(Transport, Config) ->
440440
C12 = test_deliver_v2(Transport, S, SubscriptionId2, 0, Body, C11),
441441
C13 = test_deliver_v2(Transport, S, SubscriptionId2, 1, Body, C12),
442442

443-
C14 = test_stream_info(Transport, S, Stream, C13),
443+
C14 = test_stream_stats(Transport, S, Stream, C13),
444444

445445
C15 = test_delete_stream(Transport, S, Stream, C14),
446446
_C16 = test_close(Transport, S, C15),
@@ -622,15 +622,15 @@ test_exchange_command_versions(Transport, S, C0) ->
622622
Cmd),
623623
C.
624624

625-
test_stream_info(Transport, S, Stream, C0) ->
626-
SICmd = {request, 1, {stream_info, Stream}},
625+
test_stream_stats(Transport, S, Stream, C0) ->
626+
SICmd = {request, 1, {stream_stats, Stream}},
627627
SIFrame = rabbit_stream_core:frame(SICmd),
628628
ok = Transport:send(S, SIFrame),
629629
{Cmd, C} = receive_commands(Transport, S, C0),
630630
?assertEqual({response, 1,
631-
{stream_info, ?RESPONSE_CODE_OK,
632-
#{<<"committed_offset">> => <<"1">>,
633-
<<"first_offset">> => <<"0">>}}},
631+
{stream_stats, ?RESPONSE_CODE_OK,
632+
#{<<"first_chunk_id">> => 0,
633+
<<"committed_chunk_id">> => 1}}},
634634
Cmd),
635635
C.
636636

deps/rabbitmq_stream_common/include/rabbit_stream.hrl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
-define(COMMAND_PARTITIONS, 25).
2626
-define(COMMAND_CONSUMER_UPDATE, 26).
2727
-define(COMMAND_EXCHANGE_COMMAND_VERSIONS, 27).
28-
-define(COMMAND_STREAM_INFO, 28).
28+
-define(COMMAND_STREAM_STATS, 28).
2929

3030
-define(REQUEST, 0).
3131
-define(RESPONSE, 1).

deps/rabbitmq_stream_common/src/rabbit_stream_core.erl

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@
7575
{deliver, subscription_id(), Chunk :: binary()} |
7676
{deliver_v2,
7777
subscription_id(),
78-
CommittedOffset :: osiris:offset(),
78+
CommittedChunkId :: osiris:offset(),
7979
Chunk :: binary()} |
8080
{credit, subscription_id(), Credit :: non_neg_integer()} |
8181
{metadata_update, stream_name(), response_code()} |
@@ -109,7 +109,7 @@
109109
{exchange_command_versions,
110110
[{Command :: atom(), MinVersion :: command_version(),
111111
MaxVersion :: command_version()}]} |
112-
{stream_info, Stream :: binary()}} |
112+
{stream_stats, Stream :: binary()}} |
113113
{response, correlation_id(),
114114
{declare_publisher |
115115
delete_publisher |
@@ -140,7 +140,7 @@
140140
{exchange_command_versions, response_code(),
141141
[{Command :: atom(), MinVersion :: command_version(),
142142
MaxVersion :: command_version()}]} |
143-
{stream_info, response_code(), Info :: #{binary() => binary()}}} |
143+
{stream_stats, response_code(), Stats :: #{binary() => integer()}}} |
144144
{unknown, binary()}.
145145

146146
-spec init(term()) -> state().
@@ -245,12 +245,12 @@ frame({deliver, SubscriptionId, Chunk}) ->
245245
?VERSION_1:16,
246246
SubscriptionId:8>>,
247247
Chunk]);
248-
frame({deliver_v2, SubscriptionId, CommittedOffset, Chunk}) ->
248+
frame({deliver_v2, SubscriptionId, CommittedChunkId, Chunk}) ->
249249
wrap_in_frame([<<?REQUEST:1,
250250
?COMMAND_DELIVER:15,
251251
?VERSION_2:16,
252252
SubscriptionId:8,
253-
CommittedOffset:64>>,
253+
CommittedChunkId:64>>,
254254
Chunk]);
255255
frame({metadata_update, Stream, ResponseCode}) ->
256256
StreamSize = byte_size(Stream),
@@ -474,19 +474,17 @@ response_body({exchange_command_versions = Tag, Code,
474474
[], CommandVersions),
475475
{command_id(Tag),
476476
[<<Code:16, (length(CommandVersions)):32>>, CommandVersionsBin]};
477-
response_body({stream_info = Tag, Code, Info}) ->
478-
Init = <<Code:16, (maps:size(Info)):32>>,
477+
response_body({stream_stats = Tag, Code, Stats}) ->
478+
Init = <<Code:16, (maps:size(Stats)):32>>,
479479
{command_id(Tag),
480480
maps:fold(fun(Key, Value, Acc) ->
481481
KeySize = byte_size(Key),
482-
ValueSize = byte_size(Value),
483482
<<Acc/binary,
484483
KeySize:16,
485484
Key:KeySize/binary,
486-
ValueSize:16,
487-
Value:ValueSize/binary>>
485+
Value:64/signed>>
488486
end,
489-
Init, Info)}.
487+
Init, Stats)}.
490488

491489
request_body({declare_publisher = Tag,
492490
PublisherId,
@@ -592,7 +590,7 @@ request_body({exchange_command_versions = Tag, CommandVersions}) ->
592590
[], CommandVersions),
593591
CommandVersionsLength = length(CommandVersions),
594592
{Tag, [<<CommandVersionsLength:32>>, CommandVersionsBin]};
595-
request_body({stream_info = Tag, Stream}) ->
593+
request_body({stream_stats = Tag, Stream}) ->
596594
{Tag, <<?STRING(Stream)>>}.
597595

598596
append_data(Prev, Data) when is_binary(Prev) ->
@@ -639,9 +637,9 @@ parse_request(<<?REQUEST:1,
639637
?COMMAND_DELIVER:15,
640638
?VERSION_2:16,
641639
SubscriptionId:8,
642-
CommittedOffset:64,
640+
CommittedChunkId:64,
643641
Chunk/binary>>) ->
644-
{deliver_v2, SubscriptionId, CommittedOffset, Chunk};
642+
{deliver_v2, SubscriptionId, CommittedChunkId, Chunk};
645643
parse_request(<<?REQUEST:1,
646644
?COMMAND_CREDIT:15,
647645
?VERSION_1:16,
@@ -861,11 +859,11 @@ parse_request(<<?REQUEST:1,
861859
CommandVersions = parse_command_versions(CommandVersionsBin),
862860
request(CorrelationId, {exchange_command_versions, CommandVersions});
863861
parse_request(<<?REQUEST:1,
864-
?COMMAND_STREAM_INFO:15,
862+
?COMMAND_STREAM_STATS:15,
865863
?VERSION_1:16,
866864
CorrelationId:32,
867865
?STRING(StreamSize, Stream)>>) ->
868-
request(CorrelationId, {stream_info, Stream});
866+
request(CorrelationId, {stream_stats, Stream});
869867
parse_request(Bin) ->
870868
{unknown, Bin}.
871869

@@ -952,10 +950,10 @@ parse_response_body(?COMMAND_EXCHANGE_COMMAND_VERSIONS,
952950
CommandVersionsBin/binary>>) ->
953951
CommandVersions = parse_command_versions(CommandVersionsBin),
954952
{exchange_command_versions, ResponseCode, CommandVersions};
955-
parse_response_body(?COMMAND_STREAM_INFO,
956-
<<ResponseCode:16, _Count:32, InfoBin/binary>>) ->
957-
Info = parse_map(InfoBin, #{}),
958-
{stream_info, ResponseCode, Info}.
953+
parse_response_body(?COMMAND_STREAM_STATS,
954+
<<ResponseCode:16, _Count:32, StatsBin/binary>>) ->
955+
Info = parse_int_map(StatsBin, #{}),
956+
{stream_stats, ResponseCode, Info}.
959957

960958
offset_spec(OffsetType, OffsetValueBin) ->
961959
case OffsetType of
@@ -1027,6 +1025,11 @@ parse_map(<<?STRING(KeySize, Key), ?STRING(ValSize, Value),
10271025
Acc) ->
10281026
parse_map(Rem, Acc#{Key => Value}).
10291027

1028+
parse_int_map(<<>>, Acc) ->
1029+
Acc;
1030+
parse_int_map(<<?STRING(KeySize, Key), Value:64, Rem/binary>>, Acc) ->
1031+
parse_int_map(Rem, Acc#{Key => Value}).
1032+
10301033
generate_map(Map) ->
10311034
maps:fold(fun(K, V, Acc) -> [<<?STRING(K), ?STRING(V)>> | Acc] end,
10321035
[], Map).
@@ -1107,8 +1110,8 @@ command_id(consumer_update) ->
11071110
?COMMAND_CONSUMER_UPDATE;
11081111
command_id(exchange_command_versions) ->
11091112
?COMMAND_EXCHANGE_COMMAND_VERSIONS;
1110-
command_id(stream_info) ->
1111-
?COMMAND_STREAM_INFO.
1113+
command_id(stream_stats) ->
1114+
?COMMAND_STREAM_STATS.
11121115

11131116
parse_command_id(?COMMAND_DECLARE_PUBLISHER) ->
11141117
declare_publisher;
@@ -1164,8 +1167,8 @@ parse_command_id(?COMMAND_CONSUMER_UPDATE) ->
11641167
consumer_update;
11651168
parse_command_id(?COMMAND_EXCHANGE_COMMAND_VERSIONS) ->
11661169
exchange_command_versions;
1167-
parse_command_id(?COMMAND_STREAM_INFO) ->
1168-
stream_info.
1170+
parse_command_id(?COMMAND_STREAM_STATS) ->
1171+
stream_stats.
11691172

11701173
element_index(Element, List) ->
11711174
element_index(Element, List, 0).

deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ roundtrip(_Config) ->
108108
test_roundtrip({request, 99,
109109
{exchange_command_versions,
110110
[{deliver, ?VERSION_1, ?VERSION_1}]}}),
111-
test_roundtrip({request, 99, {stream_info, <<"stream_name">>}}),
111+
test_roundtrip({request, 99, {stream_stats, <<"stream_name">>}}),
112112

113113
%% RESPONSES
114114
[test_roundtrip({response, 99, {Tag, 53}})
@@ -143,7 +143,7 @@ roundtrip(_Config) ->
143143
{exchange_command_versions, 1,
144144
[{publish, ?VERSION_1, ?VERSION_1}]}}),
145145
test_roundtrip({response, 99,
146-
{stream_info, 1, #{<<"committed_offset">> => <<"42">>}}}),
146+
{stream_stats, 1, #{<<"committed_offset">> => 42}}}),
147147
ok.
148148

149149
roundtrip_metadata(_Config) ->

0 commit comments

Comments
 (0)