Skip to content

Commit 98dd6ac

Browse files
authored
Fix autorecconetion in query metadata (#411)
* Fix autorecconetion in query metadata --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 1573214 commit 98dd6ac

File tree

3 files changed

+85
-33
lines changed

3 files changed

+85
-33
lines changed

RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
// 2.0, and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

5+
using System;
56
using System.Collections.Concurrent;
67
using System.Threading.Tasks;
8+
using Microsoft.Extensions.Logging;
79

810
namespace RabbitMQ.Stream.Client.Reliable;
911

@@ -62,16 +64,32 @@ private async Task<IConsumer> StandardConsumer(bool boot)
6264
if (IsClosedNormally(closeReason))
6365
return;
6466

65-
await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
66-
FromConnectionClosedReasonToStatusReason(closeReason)).ConfigureAwait(false);
67+
try
68+
{
69+
await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
70+
FromConnectionClosedReasonToStatusReason(closeReason)).ConfigureAwait(false);
71+
}
72+
catch (Exception e)
73+
{
74+
BaseLogger?.LogError(e,
75+
$"Stream consumer.MetadataHandler error. Auto recovery failed for: {ToString()}");
76+
}
6777
},
6878
MetadataHandler = async _ =>
6979
{
7080
if (IsClosedNormally())
7181
return;
7282

73-
await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
74-
ChangeStatusReason.MetaDataUpdate).ConfigureAwait(false);
83+
try
84+
{
85+
await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
86+
ChangeStatusReason.MetaDataUpdate).ConfigureAwait(false);
87+
}
88+
catch (Exception e)
89+
{
90+
BaseLogger?.LogError(e,
91+
$"Stream consumer.MetadataHandler error. Auto recovery failed for: {ToString()}");
92+
}
7593
},
7694
MessageHandler = async (consumer, ctx, message) =>
7795
{
@@ -129,21 +147,36 @@ private async Task<IConsumer> SuperConsumer(bool boot)
129147
await RandomWait().ConfigureAwait(false);
130148
if (IsClosedNormally(closeReason))
131149
return;
132-
133-
var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
134-
await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r,
135-
FromConnectionClosedReasonToStatusReason(closeReason)).ConfigureAwait(false);
150+
try
151+
{
152+
var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
153+
await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r,
154+
FromConnectionClosedReasonToStatusReason(closeReason)).ConfigureAwait(false);
155+
}
156+
catch (Exception e)
157+
{
158+
BaseLogger?.LogError(e,
159+
$"Super stream consumer. ConnectionClosedHandler error. Auto recovery failed for stream: {_consumerConfig.Stream}");
160+
}
136161
},
137162
MetadataHandler = async update =>
138163
{
139-
await RandomWait().ConfigureAwait(false);
140-
if (IsClosedNormally())
141-
return;
142-
143-
var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
144-
await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r,
145-
ChangeStatusReason.MetaDataUpdate)
146-
.ConfigureAwait(false);
164+
try
165+
{
166+
await RandomWait().ConfigureAwait(false);
167+
if (IsClosedNormally())
168+
return;
169+
170+
var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
171+
await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r,
172+
ChangeStatusReason.MetaDataUpdate)
173+
.ConfigureAwait(false);
174+
}
175+
catch (Exception e)
176+
{
177+
BaseLogger?.LogError(e,
178+
$"Super stream consumer.MetadataHandler error. Auto recovery failed stream: {_consumerConfig.Stream}");
179+
}
147180
},
148181
MessageHandler = async (partitionStream, consumer, ctx, message) =>
149182
{

RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
// 2.0, and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

5+
using System;
56
using System.Threading.Tasks;
7+
using Microsoft.Extensions.Logging;
68

79
namespace RabbitMQ.Stream.Client.Reliable;
810

@@ -49,21 +51,36 @@ private async Task<IProducer> SuperStreamProducer(bool boot)
4951
await RandomWait().ConfigureAwait(false);
5052
if (IsClosedNormally(closeReason))
5153
return;
52-
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
53-
await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r,
54-
ReliableBase.FromConnectionClosedReasonToStatusReason(closeReason))
55-
.ConfigureAwait(false);
54+
try
55+
{
56+
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
57+
await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r,
58+
FromConnectionClosedReasonToStatusReason(closeReason))
59+
.ConfigureAwait(false);
60+
}
61+
catch (Exception e)
62+
{
63+
BaseLogger?.LogError(e,
64+
$"Super stream producer. ConnectionClosedHandler error. Auto recovery failed for stream: {_producerConfig.Stream}");
65+
}
5666
},
5767
MetadataHandler = async update =>
5868
{
5969
await RandomWait().ConfigureAwait(false);
6070
if (IsClosedNormally())
6171
return;
62-
63-
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
64-
await OnEntityClosed(_producerConfig.StreamSystem, update.Stream, r,
65-
ChangeStatusReason.MetaDataUpdate)
66-
.ConfigureAwait(false);
72+
try
73+
{
74+
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
75+
await OnEntityClosed(_producerConfig.StreamSystem, update.Stream, r,
76+
ChangeStatusReason.MetaDataUpdate)
77+
.ConfigureAwait(false);
78+
}
79+
catch (Exception e)
80+
{
81+
BaseLogger?.LogError(e,
82+
$"Super stream producer. MetadataHandler error. Auto recovery failed stream: {_producerConfig.Stream}");
83+
}
6784
},
6885
ConfirmHandler = confirmationHandler =>
6986
{

RabbitMQ.Stream.Client/Reliable/ReliableBase.cs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -271,10 +271,11 @@ private async Task Init(bool boot)
271271
/// <param name="stream">stream name</param>
272272
/// <param name="system">stream system</param>
273273
/// <returns></returns>
274-
private async Task<bool> CheckIfStreamIsAvailable(string stream, StreamSystem system)
274+
private async Task<(bool, StreamInfo)> CheckIfStreamIsAvailable(string stream, StreamSystem system)
275275
{
276276
await Task.Delay(Consts.RandomMid()).ConfigureAwait(false);
277277
var exists = false;
278+
StreamInfo streamInfo = default;
278279
var tryAgain = true;
279280
while (tryAgain)
280281
{
@@ -286,7 +287,7 @@ private async Task<bool> CheckIfStreamIsAvailable(string stream, StreamSystem sy
286287
{
287288
// It is not enough to check if the stream exists
288289
// we need to check if the stream has the leader
289-
var streamInfo = await system.StreamInfo(stream).ConfigureAwait(false);
290+
streamInfo = await system.StreamInfo(stream).ConfigureAwait(false);
290291
ClientExceptions.CheckLeader(streamInfo);
291292
available += " and has a valid leader";
292293
}
@@ -304,7 +305,7 @@ await _reliableConfig.ResourceAvailableReconnectStrategy
304305
}
305306

306307
if (exists)
307-
return true;
308+
return (true, streamInfo);
308309
// In this case the stream doesn't exist anymore or it failed to check if the stream exists
309310
// too many tentatives for the reconnection strategy
310311
// the Entity is just closed.
@@ -316,7 +317,7 @@ await _reliableConfig.ResourceAvailableReconnectStrategy
316317
ToString()
317318
);
318319

319-
return false;
320+
return (false, default);
320321
}
321322

322323
// <summary>
@@ -409,16 +410,17 @@ internal async Task OnEntityClosed(StreamSystem system, string stream,
409410
Func<StreamInfo, Task> reconnectPartitionFunc, ChangeStatusReason reason)
410411
{
411412
var streamExists = false;
413+
412414
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
413415
UpdateStatus(ReliableEntityStatus.Reconnection, reason,
414416
[stream]);
415417
try
416418
{
417-
streamExists = await CheckIfStreamIsAvailable(stream, system)
419+
var (localStreamExists, streamInfo) = await CheckIfStreamIsAvailable(stream, system)
418420
.ConfigureAwait(false);
419-
if (streamExists)
421+
streamExists = localStreamExists;
422+
if (streamExists && !streamInfo.Equals(default))
420423
{
421-
var streamInfo = await system.StreamInfo(stream).ConfigureAwait(false);
422424
await MaybeReconnectPartition(streamInfo, ToString(), reconnectPartitionFunc).ConfigureAwait(false);
423425
}
424426
}
@@ -446,7 +448,7 @@ internal async Task OnEntityClosed(StreamSystem system, string stream, ChangeSta
446448
UpdateStatus(ReliableEntityStatus.Reconnection, reason, [stream]);
447449
try
448450
{
449-
streamExists = await CheckIfStreamIsAvailable(stream, system)
451+
(streamExists, _) = await CheckIfStreamIsAvailable(stream, system)
450452
.ConfigureAwait(false);
451453
if (streamExists)
452454
{

0 commit comments

Comments
 (0)