Skip to content

Commit 3862722

Browse files
authored
Merge pull request #1794 from rabbitmq/rabbitmq-dotnet-client-1793
Fix handling when rate limit lease can't be acquired.
2 parents 13353a2 + 068bc00 commit 3862722

File tree

4 files changed

+153
-4
lines changed

4 files changed

+153
-4
lines changed

projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,24 @@ private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationIn
344344
{
345345
if (_publisherConfirmationsEnabled)
346346
{
347-
_confirmSemaphore.Release();
347+
try
348+
{
349+
_confirmSemaphore.Release();
350+
}
351+
catch (SemaphoreFullException ex)
352+
{
353+
/*
354+
* rabbitmq/rabbitmq-dotnet-client-1793
355+
* If MaybeStartPublisherConfirmationTracking throws an exception *prior* to acquiring
356+
* _confirmSemaphore, the above Release() call will throw SemaphoreFullException.
357+
* In "normal" cases, publisherConfirmationInfo will thus be null, but if not, throw
358+
* a "bug found" exception here.
359+
*/
360+
if (publisherConfirmationInfo is not null)
361+
{
362+
throw new InvalidOperationException(InternalConstants.BugFound, ex);
363+
}
364+
}
348365

349366
if (publisherConfirmationInfo is not null)
350367
{

projects/RabbitMQ.Client/ThrottlingRateLimiter.cs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,13 +134,24 @@ protected override void Dispose(bool disposing)
134134

135135
private int CalculateDelay()
136136
{
137-
long? availablePermits = _concurrencyLimiter.GetStatistics()?.CurrentAvailablePermits;
138-
if (!(availablePermits < _throttlingThreshold))
137+
RateLimiterStatistics? rateLimiterStatistics = _concurrencyLimiter.GetStatistics();
138+
if (rateLimiterStatistics is null)
139139
{
140140
return 0;
141141
}
142142

143-
return (int)((1.0 - availablePermits / (double)_maxConcurrency) * 1000);
143+
long availablePermits = rateLimiterStatistics.CurrentAvailablePermits;
144+
if (availablePermits >= _throttlingThreshold)
145+
{
146+
/*
147+
* Note: do NOT add a delay because available permits exceeeds the threshold
148+
* below which throttling begins
149+
*/
150+
return 0;
151+
}
152+
153+
double percentageUsed = 1.0 - (availablePermits / (double)_maxConcurrency);
154+
return (int)(percentageUsed * 1000);
144155
}
145156
}
146157
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System;
33+
using System.Collections.Generic;
34+
using System.Threading;
35+
using System.Threading.RateLimiting;
36+
using System.Threading.Tasks;
37+
38+
namespace Test.Integration.GH
39+
{
40+
public class NeverAcquiredRateLimiter : RateLimiter
41+
{
42+
public override TimeSpan? IdleDuration => throw new NotImplementedException();
43+
public override RateLimiterStatistics GetStatistics() => throw new NotImplementedException();
44+
45+
protected override ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken)
46+
{
47+
return new ValueTask<RateLimitLease>(new NotAcquiredRateLimitLease());
48+
}
49+
50+
protected override RateLimitLease AttemptAcquireCore(int permitCount)
51+
{
52+
return new NotAcquiredRateLimitLease();
53+
}
54+
}
55+
56+
public class NotAcquiredRateLimitLease : RateLimitLease
57+
{
58+
public override bool IsAcquired => false;
59+
60+
public override IEnumerable<string> MetadataNames => [];
61+
62+
public override bool TryGetMetadata(string metadataName, out object metadata)
63+
{
64+
metadata = string.Empty;
65+
return true;
66+
}
67+
}
68+
}

projects/Test/Integration/GH/TestGitHubIssues.cs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,5 +329,58 @@ await ch.BasicPublishAsync(exchange: string.Empty, routingKey: queueName,
329329
_output.WriteLine("saw {0} publishExceptions", publishExceptions.Count);
330330
}
331331
}
332+
333+
[Fact]
334+
public async Task MaybeSomethingUpWithRateLimiter_GH1793()
335+
{
336+
const int messageCount = 16;
337+
338+
_connFactory = new ConnectionFactory
339+
{
340+
AutomaticRecoveryEnabled = true
341+
};
342+
343+
_conn = await _connFactory.CreateConnectionAsync();
344+
345+
var channelOpts = new CreateChannelOptions(
346+
publisherConfirmationsEnabled: true,
347+
publisherConfirmationTrackingEnabled: true,
348+
outstandingPublisherConfirmationsRateLimiter: new NeverAcquiredRateLimiter()
349+
);
350+
351+
_channel = await _conn.CreateChannelAsync(channelOpts);
352+
353+
var properties = new BasicProperties
354+
{
355+
DeliveryMode = DeliveryModes.Persistent
356+
};
357+
358+
for (int i = 0; i < messageCount; i++)
359+
{
360+
int retryCount = 0;
361+
const int maxRetries = 3;
362+
while (retryCount <= maxRetries)
363+
{
364+
try
365+
{
366+
byte[] bytes = Encoding.UTF8.GetBytes("message");
367+
await Assert.ThrowsAnyAsync<InvalidOperationException>(async () =>
368+
{
369+
await _channel.BasicPublishAsync(string.Empty, string.Empty, true, properties, bytes);
370+
});
371+
break;
372+
}
373+
catch (SemaphoreFullException ex0)
374+
{
375+
_output.WriteLine("{0} ex: {1}", _testDisplayName, ex0);
376+
retryCount++;
377+
}
378+
catch (PublishException)
379+
{
380+
retryCount++;
381+
}
382+
}
383+
}
384+
}
332385
}
333386
}

0 commit comments

Comments
 (0)