Skip to content

Commit d39d79a

Browse files
authored
Enable rabbitmq-client event logging when tests are verbose (#1559)
* Fix event ID issue by duplicating Error Event 3 code for each target framework. * Fix strange null reference exception by not using `??=` lazy init. * * Remove `Message` from `RabbitMqClientEventSource` because it does not work as intended. * Handle task canceled exception in main loop. * Ignore a couple exception cases in the `MainLoop`
1 parent 855f686 commit d39d79a

File tree

8 files changed

+267
-85
lines changed

8 files changed

+267
-85
lines changed

projects/RabbitMQ.Client/client/impl/Connection.Receive.cs

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ await ReceiveLoopAsync(mainLoopToken)
5757
{
5858
// Possible heartbeat exception
5959
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
60-
0, "End of stream",
60+
0,
61+
"End of stream",
6162
exception: eose);
6263
HandleMainLoopException(ea);
6364
}
@@ -73,15 +74,31 @@ await HardProtocolExceptionHandlerAsync(hpe, mainLoopToken)
7374
* Ensure that these exceptions eventually make it to application code
7475
*/
7576
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
76-
Constants.InternalError, fileLoadException.Message,
77+
Constants.InternalError,
78+
fileLoadException.Message,
7779
exception: fileLoadException);
7880
HandleMainLoopException(ea);
7981
}
82+
catch (OperationCanceledException ocex)
83+
{
84+
if (mainLoopToken.IsCancellationRequested)
85+
{
86+
// ignore
87+
}
88+
else
89+
{
90+
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
91+
Constants.InternalError,
92+
ocex.Message,
93+
exception: ocex);
94+
HandleMainLoopException(ea);
95+
}
96+
}
8097
catch (Exception ex)
8198
{
8299
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
83100
Constants.InternalError,
84-
$"Unexpected Exception: {ex.Message}",
101+
ex.Message,
85102
exception: ex);
86103
HandleMainLoopException(ea);
87104
}
@@ -91,24 +108,24 @@ await FinishCloseAsync(cts.Token)
91108
.ConfigureAwait(false);
92109
}
93110

94-
private async Task ReceiveLoopAsync(CancellationToken mainLoopCancelllationToken)
111+
private async Task ReceiveLoopAsync(CancellationToken mainLoopCancellationToken)
95112
{
96113
while (false == _closed)
97114
{
98-
mainLoopCancelllationToken.ThrowIfCancellationRequested();
115+
mainLoopCancellationToken.ThrowIfCancellationRequested();
99116

100117
while (_frameHandler.TryReadFrame(out InboundFrame frame))
101118
{
102119
NotifyHeartbeatListener();
103-
await ProcessFrameAsync(frame, mainLoopCancelllationToken)
120+
await ProcessFrameAsync(frame, mainLoopCancellationToken)
104121
.ConfigureAwait(false);
105122
}
106123

107124
// Done reading frames synchronously, go async
108-
InboundFrame asyncFrame = await _frameHandler.ReadFrameAsync(mainLoopCancelllationToken)
125+
InboundFrame asyncFrame = await _frameHandler.ReadFrameAsync(mainLoopCancellationToken)
109126
.ConfigureAwait(false);
110127
NotifyHeartbeatListener();
111-
await ProcessFrameAsync(asyncFrame, mainLoopCancelllationToken)
128+
await ProcessFrameAsync(asyncFrame, mainLoopCancellationToken)
112129
.ConfigureAwait(false);
113130
}
114131
}
@@ -180,17 +197,18 @@ private void HandleMainLoopException(ShutdownEventArgs reason)
180197
string message = reason.GetLogMessage();
181198
if (false == SetCloseReason(reason))
182199
{
183-
LogCloseError($"Unexpected Main Loop Exception while closing: {message}", reason.Exception);
200+
LogCloseError($"unexpected main loop exception while closing: {message}", reason.Exception);
184201
return;
185202
}
186203

187204
_channel0.MaybeSetConnectionStartException(reason.Exception);
188205

189206
OnShutdown(reason);
190-
LogCloseError($"Unexpected connection closure: {message}", reason.Exception);
207+
LogCloseError($"unexpected connection closure: {message}", reason.Exception);
191208
}
192209

193-
private async Task HardProtocolExceptionHandlerAsync(HardProtocolException hpe, CancellationToken cancellationToken)
210+
private async Task HardProtocolExceptionHandlerAsync(HardProtocolException hpe,
211+
CancellationToken mainLoopCancellationToken)
194212
{
195213
if (SetCloseReason(hpe.ShutdownReason))
196214
{
@@ -200,11 +218,11 @@ await _session0.SetSessionClosingAsync(false)
200218
try
201219
{
202220
var cmd = new ConnectionClose(hpe.ShutdownReason.ReplyCode, hpe.ShutdownReason.ReplyText, 0, 0);
203-
await _session0.TransmitAsync(in cmd, cancellationToken)
221+
await _session0.TransmitAsync(in cmd, mainLoopCancellationToken)
204222
.ConfigureAwait(false);
205223
if (hpe.CanShutdownCleanly)
206224
{
207-
await ClosingLoopAsync(cancellationToken)
225+
await ClosingLoopAsync(mainLoopCancellationToken)
208226
.ConfigureAwait(false);
209227
}
210228
}
@@ -222,13 +240,13 @@ await ClosingLoopAsync(cancellationToken)
222240
///<remarks>
223241
/// Loop only used while quiescing. Use only to cleanly close connection
224242
///</remarks>
225-
private async Task ClosingLoopAsync(CancellationToken cancellationToken)
243+
private async Task ClosingLoopAsync(CancellationToken mainLoopCancellationToken)
226244
{
227245
try
228246
{
229247
_frameHandler.ReadTimeout = default;
230248
// Wait for response/socket closure or timeout
231-
await ReceiveLoopAsync(cancellationToken)
249+
await ReceiveLoopAsync(mainLoopCancellationToken)
232250
.ConfigureAwait(false);
233251
}
234252
catch (ObjectDisposedException ode)

projects/RabbitMQ.Client/client/impl/Frame.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -254,9 +254,10 @@ private static void ProcessProtocolHeader(ReadOnlySequence<byte> buffer)
254254
}
255255
}
256256

257-
internal static async ValueTask<InboundFrame> ReadFromPipeAsync(PipeReader reader, uint maxMessageSize, CancellationToken cancellationToken)
257+
internal static async ValueTask<InboundFrame> ReadFromPipeAsync(PipeReader reader, uint maxMessageSize,
258+
CancellationToken mainLoopCancellationToken)
258259
{
259-
ReadResult result = await reader.ReadAsync(cancellationToken)
260+
ReadResult result = await reader.ReadAsync(mainLoopCancellationToken)
260261
.ConfigureAwait(false);
261262

262263
ReadOnlySequence<byte> buffer = result.Buffer;
@@ -270,7 +271,7 @@ internal static async ValueTask<InboundFrame> ReadFromPipeAsync(PipeReader reade
270271
reader.AdvanceTo(buffer.Start, buffer.End);
271272

272273
// Not enough data, read a bit more
273-
result = await reader.ReadAsync(cancellationToken)
274+
result = await reader.ReadAsync(mainLoopCancellationToken)
274275
.ConfigureAwait(false);
275276

276277
MaybeThrowEndOfStream(result, buffer);

projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,9 +278,9 @@ await _pipeReader.CompleteAsync()
278278
}
279279
}
280280

281-
public ValueTask<InboundFrame> ReadFrameAsync(CancellationToken cancellationToken)
281+
public ValueTask<InboundFrame> ReadFrameAsync(CancellationToken mainLoopCancellationToken)
282282
{
283-
return InboundFrame.ReadFromPipeAsync(_pipeReader, _amqpTcpEndpoint.MaxMessageSize, cancellationToken);
283+
return InboundFrame.ReadFromPipeAsync(_pipeReader, _amqpTcpEndpoint.MaxMessageSize, mainLoopCancellationToken);
284284
}
285285

286286
public bool TryReadFrame(out InboundFrame frame)

projects/RabbitMQ.Client/client/logging/ESLog.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,15 @@
2929
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
3030
//---------------------------------------------------------------------------
3131

32+
using System;
33+
3234
namespace RabbitMQ.Client.Logging
3335
{
3436
internal static class ESLog
3537
{
3638
public static void Info(string message)
3739
{
38-
Logging.RabbitMqClientEventSource.Log.Info(message);
40+
RabbitMqClientEventSource.Log.Info(message);
3941
}
4042

4143
public static void Info(string message, params object[] args)
@@ -46,7 +48,7 @@ public static void Info(string message, params object[] args)
4648

4749
public static void Warn(string message)
4850
{
49-
Logging.RabbitMqClientEventSource.Log.Warn(message);
51+
RabbitMqClientEventSource.Log.Warn(message);
5052
}
5153

5254
public static void Warn(string message, params object[] args)
@@ -55,12 +57,12 @@ public static void Warn(string message, params object[] args)
5557
Warn(msg);
5658
}
5759

58-
public static void Error(string message, System.Exception ex)
60+
public static void Error(string message, Exception ex)
5961
{
60-
Logging.RabbitMqClientEventSource.Log.Error(message, ex);
62+
RabbitMqClientEventSource.Log.Error(message, ex);
6163
}
6264

63-
public static void Error(string message, System.Exception ex, params object[] args)
65+
public static void Error(string message, Exception ex, params object[] args)
6466
{
6567
string msg = string.Format(message, args);
6668
Error(msg, ex);

projects/RabbitMQ.Client/client/logging/RabbitMqClientEventSource.Counters.cs

Lines changed: 16 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -29,93 +29,63 @@
2929
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
3030
//---------------------------------------------------------------------------
3131

32-
using System;
3332
using System.Diagnostics.Tracing;
3433
using System.Threading;
3534

3635
namespace RabbitMQ.Client.Logging
3736
{
38-
#nullable enable
3937
internal sealed partial class RabbitMqClientEventSource
4038
{
41-
private static int ConnectionsOpened;
42-
private static int ConnectionsClosed;
43-
private static int ChannelsOpened;
44-
private static int ChannelsClosed;
45-
private static long BytesSent;
46-
private static long BytesReceived;
47-
private static long CommandsSent;
48-
private static long CommandsReceived;
39+
private static int s_connectionsOpened;
40+
private static int s_connectionsClosed;
41+
private static int s_channelsOpened;
42+
private static int s_channelsClosed;
43+
private static long s_bytesSent;
44+
private static long s_bytesReceived;
45+
private static long s_commandsSent;
46+
private static long s_commandsReceived;
4947

50-
#if NET6_0_OR_GREATER
51-
private PollingCounter? _connectionOpenedCounter;
52-
private PollingCounter? _openConnectionCounter;
53-
private PollingCounter? _channelOpenedCounter;
54-
private PollingCounter? _openChannelCounter;
55-
private IncrementingPollingCounter? _bytesSentCounter;
56-
private IncrementingPollingCounter? _bytesReceivedCounter;
57-
private IncrementingPollingCounter? _commandSentCounter;
58-
private IncrementingPollingCounter? _commandReceivedCounter;
59-
60-
protected override void OnEventCommand(EventCommandEventArgs command)
61-
{
62-
if (command.Command == EventCommand.Enable)
63-
{
64-
_connectionOpenedCounter ??= new PollingCounter("total-connections-opened", this, () => ConnectionsOpened) { DisplayName = "Total connections opened" };
65-
_openConnectionCounter ??= new PollingCounter("current-open-connections", this, () => ConnectionsOpened - ConnectionsClosed) { DisplayName = "Current open connections count" };
66-
67-
_channelOpenedCounter ??= new PollingCounter("total-channels-opened", this, () => ChannelsOpened) { DisplayName = "Total channels opened" };
68-
_openChannelCounter ??= new PollingCounter("current-open-channels", this, () => ChannelsOpened - ChannelsClosed) { DisplayName = "Current open channels count" };
69-
70-
_bytesSentCounter ??= new IncrementingPollingCounter("bytes-sent-rate", this, () => Interlocked.Read(ref BytesSent)) { DisplayName = "Byte sending rate", DisplayUnits = "B", DisplayRateTimeScale = new TimeSpan(0, 0, 1) };
71-
_bytesReceivedCounter ??= new IncrementingPollingCounter("bytes-received-rate", this, () => Interlocked.Read(ref BytesReceived)) { DisplayName = "Byte receiving rate", DisplayUnits = "B", DisplayRateTimeScale = new TimeSpan(0, 0, 1) };
72-
73-
_commandSentCounter ??= new IncrementingPollingCounter("AMQP-method-sent-rate", this, () => Interlocked.Read(ref CommandsSent)) { DisplayName = "AMQP method sending rate", DisplayUnits = "B", DisplayRateTimeScale = new TimeSpan(0, 0, 1) };
74-
_commandReceivedCounter ??= new IncrementingPollingCounter("AMQP-method-received-rate", this, () => Interlocked.Read(ref CommandsReceived)) { DisplayName = "AMQP method receiving rate", DisplayUnits = "B", DisplayRateTimeScale = new TimeSpan(0, 0, 1) };
75-
}
76-
}
77-
#endif
7848
[NonEvent]
7949
public void ConnectionOpened()
8050
{
81-
Interlocked.Increment(ref ConnectionsOpened);
51+
Interlocked.Increment(ref s_connectionsOpened);
8252
}
8353

8454
[NonEvent]
8555
public void ConnectionClosed()
8656
{
87-
Interlocked.Increment(ref ConnectionsClosed);
57+
Interlocked.Increment(ref s_connectionsClosed);
8858
}
8959

9060
[NonEvent]
9161
public void ChannelOpened()
9262
{
93-
Interlocked.Increment(ref ChannelsOpened);
63+
Interlocked.Increment(ref s_channelsOpened);
9464
}
9565

9666
[NonEvent]
9767
public void ChannelClosed()
9868
{
99-
Interlocked.Increment(ref ChannelsClosed);
69+
Interlocked.Increment(ref s_channelsClosed);
10070
}
10171

10272
[NonEvent]
10373
public void DataReceived(int byteCount)
10474
{
105-
Interlocked.Add(ref BytesReceived, byteCount);
75+
Interlocked.Add(ref s_bytesReceived, byteCount);
10676
}
10777

10878
[NonEvent]
10979
public void CommandSent(int byteCount)
11080
{
111-
Interlocked.Increment(ref CommandsSent);
112-
Interlocked.Add(ref BytesSent, byteCount);
81+
Interlocked.Increment(ref s_commandsSent);
82+
Interlocked.Add(ref s_bytesSent, byteCount);
11383
}
11484

11585
[NonEvent]
11686
public void CommandReceived()
11787
{
118-
Interlocked.Increment(ref CommandsReceived);
88+
Interlocked.Increment(ref s_commandsReceived);
11989
}
12090
}
12191
}

0 commit comments

Comments
 (0)