Skip to content

Commit 8687e73

Browse files
committed
Add StreamInfo command to stream protocol
It returns general information on a stream, the first and committed offsets for now. Fixes #5412
1 parent 589ed43 commit 8687e73

File tree

9 files changed

+278
-110
lines changed

9 files changed

+278
-110
lines changed

deps/rabbitmq_stream/docs/PROTOCOL.adoc

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,11 @@ used to make the difference between a request (0) and a response (1). Example fo
226226
|0x001b
227227
|Yes
228228

229+
|<<streaminfo>>
230+
|Client
231+
|0x001c
232+
|Yes
233+
229234
|===
230235

231236
=== DeclarePublisher
@@ -702,6 +707,26 @@ CommandVersionsExchangeResponse => Key Version CorrelationId ResponseCode [Comma
702707
MaxVersion => uint16
703708
```
704709

710+
=== StreamInfo
711+
712+
```
713+
StreamInfoRequest => Key Version CorrelationId Stream
714+
Key => uint16 // 0x001c
715+
Version => uint16
716+
CorrelationId => uint32
717+
Stream => string
718+
719+
StreamInfoResponse => Key Version CorrelationId ResponseCode Info
720+
Key => uint16 // 0x801c
721+
Version => uint16
722+
CorrelationId => uint32
723+
ResponseCode => uint16
724+
Info => [PieceOfInfo]
725+
PieceOfInfo => Key Value
726+
Key => string
727+
Value => string
728+
```
729+
705730

706731
== Authentication
707732

deps/rabbitmq_stream/src/rabbit_stream_manager.erl

Lines changed: 111 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
delete_super_stream/3,
3434
lookup_leader/2,
3535
lookup_local_member/2,
36+
lookup_member/2,
3637
topology/2,
3738
route/3,
3839
partitions/2,
@@ -100,11 +101,17 @@ lookup_leader(VirtualHost, Stream) ->
100101
lookup_local_member(VirtualHost, Stream) ->
101102
gen_server:call(?MODULE, {lookup_local_member, VirtualHost, Stream}).
102103

104+
-spec lookup_member(binary(), binary()) ->
105+
{ok, pid()} | {error, not_found} |
106+
{error, not_available}.
107+
lookup_member(VirtualHost, Stream) ->
108+
gen_server:call(?MODULE, {lookup_member, VirtualHost, Stream}).
109+
103110
-spec topology(binary(), binary()) ->
104111
{ok,
105112
#{leader_node => undefined | pid(),
106113
replica_nodes => [pid()]}} |
107-
{error, stream_not_found} | {error, stream_not_available}.
114+
{error, not_found} | {error, not_available}.
108115
topology(VirtualHost, Stream) ->
109116
gen_server:call(?MODULE, {topology, VirtualHost, Stream}).
110117

@@ -292,119 +299,105 @@ handle_call({delete_super_stream, VirtualHost, SuperStream, Username},
292299
{reply, {error, Error}, State}
293300
end;
294301
handle_call({lookup_leader, VirtualHost, Stream}, _From, State) ->
295-
Name =
296-
#resource{virtual_host = VirtualHost,
297-
kind = queue,
298-
name = Stream},
299-
Res = case rabbit_amqqueue:lookup(Name) of
302+
Res = case lookup_stream(VirtualHost, Stream) of
300303
{ok, Q} ->
301-
case is_stream_queue(Q) of
304+
LeaderPid = amqqueue:get_pid(Q),
305+
case process_alive(LeaderPid) of
302306
true ->
303-
LeaderPid = amqqueue:get_pid(Q),
304-
case process_alive(LeaderPid) of
305-
true ->
306-
{ok, LeaderPid};
307-
false ->
308-
case leader_from_members(Q) of
309-
{ok, Pid} ->
310-
{ok, Pid};
311-
_ ->
312-
{error, not_available}
313-
end
314-
end;
315-
_ ->
316-
{error, not_found}
307+
{ok, LeaderPid};
308+
false ->
309+
case leader_from_members(Q) of
310+
{ok, Pid} ->
311+
{ok, Pid};
312+
_ ->
313+
{error, not_available}
314+
end
317315
end;
318-
{error, not_found} ->
319-
case rabbit_amqqueue:not_found_or_absent_dirty(Name) of
320-
not_found ->
321-
{error, not_found};
322-
_ ->
323-
{error, not_available}
324-
end
316+
R ->
317+
R
325318
end,
326319
{reply, Res, State};
327320
handle_call({lookup_local_member, VirtualHost, Stream}, _From,
328321
State) ->
329-
Name =
330-
#resource{virtual_host = VirtualHost,
331-
kind = queue,
332-
name = Stream},
333-
Res = case rabbit_amqqueue:lookup(Name) of
322+
Res = case lookup_stream(VirtualHost, Stream) of
334323
{ok, Q} ->
335-
case is_stream_queue(Q) of
336-
true ->
337-
#{name := StreamName} = amqqueue:get_type_state(Q),
338-
% FIXME check if pid is alive in case of stale information
339-
case rabbit_stream_coordinator:local_pid(StreamName)
340-
of
341-
{ok, Pid} when is_pid(Pid) ->
342-
{ok, Pid};
343-
{error, timeout} ->
344-
{error, not_available};
345-
_ ->
346-
{error, not_available}
347-
end;
348-
_ ->
349-
{error, not_found}
350-
end;
351-
{error, not_found} ->
352-
case rabbit_amqqueue:not_found_or_absent_dirty(Name) of
353-
not_found ->
354-
{error, not_found};
324+
#{name := StreamName} = amqqueue:get_type_state(Q),
325+
% FIXME check if pid is alive in case of stale information
326+
case rabbit_stream_coordinator:local_pid(StreamName) of
327+
{ok, Pid} when is_pid(Pid) ->
328+
{ok, Pid};
329+
{error, timeout} ->
330+
{error, not_available};
355331
_ ->
356332
{error, not_available}
357-
end
333+
end;
334+
R ->
335+
R
358336
end,
359337
{reply, Res, State};
360-
handle_call({topology, VirtualHost, Stream}, _From, State) ->
361-
Name =
362-
#resource{virtual_host = VirtualHost,
363-
kind = queue,
364-
name = Stream},
365-
Res = case rabbit_amqqueue:lookup(Name) of
338+
handle_call({lookup_member, VirtualHost, Stream}, _From, State) ->
339+
Res = case lookup_stream(VirtualHost, Stream) of
366340
{ok, Q} ->
367-
case is_stream_queue(Q) of
368-
true ->
369-
QState = amqqueue:get_type_state(Q),
370-
#{name := StreamName} = QState,
341+
#{name := StreamName} = amqqueue:get_type_state(Q),
342+
% FIXME check if pid is alive in case of stale information
343+
case rabbit_stream_coordinator:local_pid(StreamName) of
344+
{ok, Pid} when is_pid(Pid) ->
345+
{ok, Pid};
346+
_ ->
371347
case rabbit_stream_coordinator:members(StreamName) of
372348
{ok, Members} ->
373-
{ok,
374-
maps:fold(fun (_Node, {undefined, _Role},
375-
Acc) ->
376-
Acc;
377-
(LeaderNode, {_Pid, writer},
378-
Acc) ->
379-
Acc#{leader_node =>
380-
LeaderNode};
381-
(ReplicaNode, {_Pid, replica},
382-
Acc) ->
383-
#{replica_nodes :=
384-
ReplicaNodes} =
385-
Acc,
386-
Acc#{replica_nodes =>
387-
ReplicaNodes
388-
++ [ReplicaNode]};
389-
(_Node, _, Acc) ->
390-
Acc
391-
end,
392-
#{leader_node => undefined,
393-
replica_nodes => []},
394-
Members)};
349+
case lists:search(fun ({undefined, _Role}) ->
350+
false;
351+
({P, _Role})
352+
when is_pid(P) ->
353+
is_process_alive(P);
354+
(_) ->
355+
false
356+
end,
357+
maps:values(Members))
358+
of
359+
{value, {Pid, _Role}} ->
360+
{ok, Pid};
361+
_ ->
362+
{error, not_available}
363+
end;
395364
_ ->
396-
{error, stream_not_available}
397-
end;
398-
_ ->
399-
{error, stream_not_found}
365+
{error, not_available}
366+
end
400367
end;
401-
{error, not_found} ->
402-
case rabbit_amqqueue:not_found_or_absent_dirty(Name) of
403-
not_found ->
404-
{error, stream_not_found};
368+
R ->
369+
R
370+
end,
371+
{reply, Res, State};
372+
handle_call({topology, VirtualHost, Stream}, _From, State) ->
373+
Res = case lookup_stream(VirtualHost, Stream) of
374+
{ok, Q} ->
375+
QState = amqqueue:get_type_state(Q),
376+
#{name := StreamName} = QState,
377+
case rabbit_stream_coordinator:members(StreamName) of
378+
{ok, Members} ->
379+
{ok,
380+
maps:fold(fun (_Node, {undefined, _Role}, Acc) ->
381+
Acc;
382+
(LeaderNode, {_Pid, writer}, Acc) ->
383+
Acc#{leader_node => LeaderNode};
384+
(ReplicaNode, {_Pid, replica}, Acc) ->
385+
#{replica_nodes := ReplicaNodes} =
386+
Acc,
387+
Acc#{replica_nodes =>
388+
ReplicaNodes
389+
++ [ReplicaNode]};
390+
(_Node, _, Acc) ->
391+
Acc
392+
end,
393+
#{leader_node => undefined,
394+
replica_nodes => []},
395+
Members)};
405396
_ ->
406-
{error, stream_not_available}
407-
end
397+
{error, not_available}
398+
end;
399+
R ->
400+
R
408401
end,
409402
{reply, Res, State};
410403
handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From,
@@ -837,6 +830,28 @@ delete_super_stream_exchange(VirtualHost, Name, Username) ->
837830
{error, validation_failed}
838831
end.
839832

833+
lookup_stream(VirtualHost, Stream) ->
834+
Name =
835+
#resource{virtual_host = VirtualHost,
836+
kind = queue,
837+
name = Stream},
838+
case rabbit_amqqueue:lookup(Name) of
839+
{ok, Q} ->
840+
case is_stream_queue(Q) of
841+
true ->
842+
{ok, Q};
843+
_ ->
844+
{error, not_found}
845+
end;
846+
{error, not_found} ->
847+
case rabbit_amqqueue:not_found_or_absent_dirty(Name) of
848+
not_found ->
849+
{error, not_found};
850+
_ ->
851+
{error, not_available}
852+
end
853+
end.
854+
840855
leader_from_members(Q) ->
841856
QState = amqqueue:get_type_state(Q),
842857
#{name := StreamName} = QState,

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1734,7 +1734,6 @@ handle_frame_post_auth(Transport,
17341734
State,
17351735
{request, CorrelationId,
17361736
{query_publisher_sequence, Reference, Stream}}) ->
1737-
% FrameSize = ?RESPONSE_FRAME_SIZE + 8,
17381737
{ResponseCode, Sequence} =
17391738
case rabbit_stream_utils:check_read_permitted(#resource{name = Stream,
17401739
kind = queue,
@@ -2641,6 +2640,63 @@ handle_frame_post_auth(Transport,
26412640
Connection1 =
26422641
process_client_command_versions(Connection0, CommandVersions),
26432642
{Connection1, State};
2643+
handle_frame_post_auth(Transport,
2644+
#stream_connection{socket = S,
2645+
virtual_host = VirtualHost,
2646+
user = User} =
2647+
Connection,
2648+
State,
2649+
{request, CorrelationId, {stream_info, Stream}}) ->
2650+
QueueResource =
2651+
#resource{name = Stream,
2652+
kind = queue,
2653+
virtual_host = VirtualHost},
2654+
Response =
2655+
case rabbit_stream_utils:check_read_permitted(QueueResource, User,
2656+
#{})
2657+
of
2658+
ok ->
2659+
case rabbit_stream_manager:lookup_member(VirtualHost, Stream) of
2660+
{error, not_available} ->
2661+
rabbit_global_counters:increase_protocol_counter(stream,
2662+
?STREAM_NOT_AVAILABLE,
2663+
1),
2664+
{stream_info, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE, #{}};
2665+
{error, not_found} ->
2666+
rabbit_global_counters:increase_protocol_counter(stream,
2667+
?STREAM_DOES_NOT_EXIST,
2668+
1),
2669+
{stream_info, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST,
2670+
#{}};
2671+
{ok, MemberPid} ->
2672+
OffsetInfo =
2673+
case gen:call(MemberPid, '$gen_call',
2674+
get_reader_context)
2675+
of
2676+
{ok, #{offset_ref := undefined}} ->
2677+
#{};
2678+
{ok, #{offset_ref := OffsetRef}} ->
2679+
#{<<"first_offset">> =>
2680+
rabbit_data_coercion:to_binary(
2681+
atomics:get(OffsetRef, 2)),
2682+
<<"committed_offset">> =>
2683+
rabbit_data_coercion:to_binary(
2684+
atomics:get(OffsetRef, 1))};
2685+
_ ->
2686+
#{}
2687+
end,
2688+
2689+
{stream_info, ?RESPONSE_CODE_OK, OffsetInfo}
2690+
end;
2691+
error ->
2692+
rabbit_global_counters:increase_protocol_counter(stream,
2693+
?ACCESS_REFUSED,
2694+
1),
2695+
{stream_info, ?RESPONSE_CODE_ACCESS_REFUSED, #{}}
2696+
end,
2697+
Frame = rabbit_stream_core:frame({response, CorrelationId, Response}),
2698+
send(Transport, S, Frame),
2699+
{Connection, State};
26442700
handle_frame_post_auth(Transport,
26452701
#stream_connection{socket = S} = Connection,
26462702
State,

deps/rabbitmq_stream/src/rabbit_stream_utils.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,4 +275,5 @@ command_versions() ->
275275
{close, ?VERSION_1, ?VERSION_1},
276276
{heartbeat, ?VERSION_1, ?VERSION_1},
277277
{route, ?VERSION_1, ?VERSION_1},
278-
{partitions, ?VERSION_1, ?VERSION_1}].
278+
{partitions, ?VERSION_1, ?VERSION_1},
279+
{stream_info, ?VERSION_1, ?VERSION_1}].

0 commit comments

Comments
 (0)