Skip to content

Commit 9fcff60

Browse files
committed
Added tests for non-copying body
1 parent 3e47e60 commit 9fcff60

12 files changed

+290
-11
lines changed

projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ internal class RentedOutgoingMemory : IDisposable, IResettable
1818

1919
internal int Size => (int) Data.Length;
2020

21+
public int RentedArraySize => _rentedArray?.Length ?? 0;
22+
2123
internal ReadOnlySequence<byte> Data { get; private set; }
2224

2325
/// <summary>
@@ -100,7 +102,7 @@ bool IResettable.TryReset()
100102
return true;
101103
}
102104

103-
public static RentedOutgoingMemory GetAndInitialize(ReadOnlySequence<byte> mem, byte[] buffer, bool waitSend = false)
105+
public static RentedOutgoingMemory GetAndInitialize(ReadOnlySequence<byte> mem, byte[]? buffer = null, bool waitSend = false)
104106
{
105107
var rented = s_pool.Get();
106108

@@ -115,7 +117,7 @@ public static RentedOutgoingMemory GetAndInitialize(ReadOnlySequence<byte> mem,
115117
return rented;
116118
}
117119

118-
public static RentedOutgoingMemory GetAndInitialize(ReadOnlyMemory<byte> mem, byte[] buffer, bool waitSend = false)
120+
public static RentedOutgoingMemory GetAndInitialize(ReadOnlyMemory<byte> mem, byte[]? buffer = null, bool waitSend = false)
119121
{
120122
return GetAndInitialize(new ReadOnlySequence<byte>(mem), buffer, waitSend);
121123
}

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -763,7 +763,12 @@ public async ValueTask<IConnection> CreateConnectionAsync(IEndpointResolver endp
763763
}
764764
}
765765

766-
private ConnectionConfig CreateConfig(string clientProvidedName)
766+
internal ConnectionConfig CreateConfig()
767+
{
768+
return CreateConfig(ClientProvidedName);
769+
}
770+
771+
internal ConnectionConfig CreateConfig(string clientProvidedName)
767772
{
768773
return new ConnectionConfig(
769774
VirtualHost,

projects/RabbitMQ.Client/client/api/IChannelExtensions.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,14 +140,14 @@ public static ValueTask BasicPublishAsync<T>(this IChannel channel, PublicationA
140140
public static void BasicPublish(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
141141
=> channel.BasicPublish(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);
142142

143-
public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
144-
=> channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);
143+
public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false, bool? copyBody = null)
144+
=> channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory, copyBody);
145145

146-
public static void BasicPublish(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
146+
public static void BasicPublish(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false, bool? copyBody = null)
147147
=> channel.BasicPublish(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);
148148

149-
public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
150-
=> channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);
149+
public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false, bool? copyBody = null)
150+
=> channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory, copyBody);
151151
#nullable disable
152152

153153
/// <summary>

projects/RabbitMQ.Client/client/api/IConnection.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,11 @@ public interface IConnection : INetworkConnection, IDisposable
125125
/// </summary>
126126
IEnumerable<ShutdownReportEntry> ShutdownReport { get; }
127127

128+
/// <summary>
129+
/// The threshold for when to copy the body to a temporary array.
130+
/// </summary>
131+
int CopyBodyToMemoryThreshold { get; }
132+
128133
/// <summary>
129134
/// Application-specific connection name, will be displayed in the management UI
130135
/// if RabbitMQ server supports it. This value doesn't have to be unique and cannot

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ internal sealed partial class AutorecoveringConnection : IConnection
5050
private Connection _innerConnection;
5151
private bool _disposed;
5252

53-
private Connection InnerConnection
53+
internal Connection InnerConnection
5454
{
5555
get
5656
{
@@ -181,6 +181,8 @@ public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer
181181

182182
public IEnumerable<ShutdownReportEntry> ShutdownReport => InnerConnection.ShutdownReport;
183183

184+
public int CopyBodyToMemoryThreshold => InnerConnection.CopyBodyToMemoryThreshold;
185+
184186
public IProtocol Protocol => Endpoint.Protocol;
185187

186188
public RecoveryAwareChannel CreateNonRecoveringChannel()

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ internal sealed partial class Connection : IConnection
5858
private ShutdownEventArgs? _closeReason;
5959
public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason);
6060

61+
internal bool TrackRentedBytes = false;
62+
internal uint RentedBytes;
63+
6164
internal Connection(ConnectionConfig config, IFrameHandler frameHandler)
6265
{
6366
_config = config;
@@ -552,9 +555,23 @@ internal void Write(RentedOutgoingMemory frames)
552555

553556
internal ValueTask WriteAsync(RentedOutgoingMemory frames)
554557
{
558+
TrackRented(frames.RentedArraySize);
559+
555560
return _frameHandler.WriteAsync(frames);
556561
}
557562

563+
private void TrackRented(int size)
564+
{
565+
if (TrackRentedBytes && size > 0)
566+
{
567+
#if NET
568+
Interlocked.Add(ref RentedBytes, (uint)size);
569+
#else
570+
Interlocked.Add(ref Unsafe.As<uint, int>(ref RentedBytes), size);
571+
#endif
572+
}
573+
}
574+
558575
public void Dispose()
559576
{
560577
if (_disposed)

projects/RabbitMQ.Client/client/impl/SessionBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public ValueTask TransmitAsync<TMethod, THeader>(in TMethod cmd, in THeader head
180180
ThrowAlreadyClosedException();
181181
}
182182

183-
copyBody ??= body.Length >= Connection.CopyBodyToMemoryThreshold;
183+
copyBody ??= body.Length > Connection.CopyBodyToMemoryThreshold;
184184

185185
return Connection.WriteAsync(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, Connection.MaxPayloadSize, copyBody.Value));
186186
}

projects/Test/AsyncIntegration/TestBasicPublishAsync.cs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,5 +67,76 @@ public async Task TestQueuePurgeAsync()
6767
Assert.True(await publishSyncSource.Task);
6868
Assert.Equal((uint)messageCount, await _channel.QueuePurgeAsync(q));
6969
}
70+
71+
[Fact]
72+
public async Task TestNonCopyingBody()
73+
{
74+
const int size = 1024;
75+
76+
QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null);
77+
byte[] body = GetRandomBody(size);
78+
79+
uint rentedBytes;
80+
81+
using (var result = await TrackRentedBytes())
82+
{
83+
await _channel.BasicPublishAsync(string.Empty, q, body, copyBody: false);
84+
rentedBytes = result.RentedBytes;
85+
}
86+
87+
Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q));
88+
89+
// It is expected that the rented bytes is smaller than the size of the body
90+
// since we're not copying the body. Only the frame headers are rented.
91+
Assert.True(rentedBytes < size);
92+
}
93+
94+
[Fact]
95+
public async Task TestCopyingBody()
96+
{
97+
const int size = 1024;
98+
99+
QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null);
100+
byte[] body = GetRandomBody(size);
101+
102+
uint rentedBytes;
103+
104+
using (var result = await TrackRentedBytes())
105+
{
106+
await _channel.BasicPublishAsync(string.Empty, q, body, copyBody: true);
107+
rentedBytes = result.RentedBytes;
108+
}
109+
110+
Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q));
111+
112+
// It is expected that the rented bytes is larger than the size of the body
113+
// since the body is copied with the frame headers.
114+
Assert.True(rentedBytes >= size);
115+
}
116+
117+
[Fact]
118+
public async Task TestDefaultCopyingBody()
119+
{
120+
Assert.Equal(int.MaxValue, _conn.CopyBodyToMemoryThreshold);
121+
122+
const int size = 1024;
123+
124+
QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null);
125+
byte[] body = GetRandomBody(size);
126+
127+
uint rentedBytes;
128+
129+
using (var result = await TrackRentedBytes())
130+
{
131+
await _channel.BasicPublishAsync(string.Empty, q, body, copyBody: true);
132+
rentedBytes = result.RentedBytes;
133+
}
134+
135+
Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q));
136+
137+
// It is expected that the rented bytes is larger than the size of the body
138+
// since the body is copied with the frame headers.
139+
Assert.True(rentedBytes >= size);
140+
}
70141
}
71142
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
using System.Threading.Tasks;
2+
using RabbitMQ.Client;
3+
using Xunit;
4+
using Xunit.Abstractions;
5+
6+
namespace Test.AsyncIntegration;
7+
8+
public class TestBasicPublishCopyBodyAsync : AsyncIntegrationFixture
9+
{
10+
public TestBasicPublishCopyBodyAsync(ITestOutputHelper output) : base(output)
11+
{
12+
}
13+
14+
protected override ConnectionFactory CreateConnectionFactory()
15+
{
16+
var factory = base.CreateConnectionFactory();
17+
factory.CopyBodyToMemoryThreshold = 1024;
18+
return factory;
19+
}
20+
21+
[Theory(Skip = "Parallelization is disabled for this collection")]
22+
[InlineData(512)]
23+
[InlineData(1024)]
24+
public async Task TestNonCopyingBody(ushort size)
25+
{
26+
QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null);
27+
byte[] body = GetRandomBody(size);
28+
29+
uint rentedBytes;
30+
31+
using (var result = await TrackRentedBytes())
32+
{
33+
await _channel.BasicPublishAsync(string.Empty, q, body);
34+
rentedBytes = result.RentedBytes;
35+
}
36+
37+
Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q));
38+
39+
// It is expected that the rented bytes is smaller than the size of the body
40+
// since we're not copying the body. Only the frame headers are rented.
41+
Assert.True(rentedBytes < size);
42+
}
43+
44+
[Theory]
45+
[InlineData(1025)]
46+
[InlineData(2048)]
47+
public async Task TestCopyingBody(ushort size)
48+
{
49+
QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null);
50+
byte[] body = GetRandomBody(size);
51+
52+
uint rentedBytes;
53+
54+
using (var result = await TrackRentedBytes())
55+
{
56+
await _channel.BasicPublishAsync(string.Empty, q, body);
57+
rentedBytes = result.RentedBytes;
58+
}
59+
60+
Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q));
61+
62+
// It is expected that the rented bytes is larger than the size of the body
63+
// since the body is copied with the frame headers.
64+
Assert.True(rentedBytes >= size);
65+
}
66+
}

projects/Test/Common/IntegrationFixtureBase.cs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
using System.Reflection;
3636
using System.Text;
3737
using System.Threading;
38+
using System.Threading.Tasks;
3839
using RabbitMQ.Client;
3940
using RabbitMQ.Client.Framing.Impl;
4041
using Xunit;
@@ -44,6 +45,8 @@ namespace Test
4445
{
4546
public abstract class IntegrationFixtureBase : IDisposable
4647
{
48+
private readonly SemaphoreSlim _byteTrackingLock = new SemaphoreSlim(1, 1);
49+
4750
private static bool s_isRunningInCI = false;
4851
private static bool s_isWindows = false;
4952
private static bool s_isVerbose = false;
@@ -292,7 +295,7 @@ protected void Wait(ManualResetEventSlim latch, TimeSpan timeSpan, string desc)
292295
$"waiting {timeSpan.TotalSeconds} seconds on a latch for '{desc}' timed out");
293296
}
294297

295-
protected ConnectionFactory CreateConnectionFactory()
298+
protected virtual ConnectionFactory CreateConnectionFactory()
296299
{
297300
string now = DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture);
298301
return new ConnectionFactory
@@ -339,6 +342,39 @@ protected void HandleChannelShutdown(IChannel ch, ShutdownEventArgs args, Action
339342
a(args);
340343
}
341344

345+
protected async Task<TrackRentedByteResult> TrackRentedBytes()
346+
{
347+
Connection connection;
348+
349+
if (_conn is AutorecoveringConnection autorecoveringConnection)
350+
{
351+
connection = autorecoveringConnection.InnerConnection as Connection;
352+
}
353+
else
354+
{
355+
connection = _conn as Connection;
356+
}
357+
358+
if (connection is null)
359+
{
360+
throw new InvalidOperationException("Cannot track rented bytes without a connection");
361+
}
362+
363+
await _byteTrackingLock.WaitAsync();
364+
365+
try
366+
{
367+
connection.RentedBytes = 0;
368+
connection.TrackRentedBytes = true;
369+
return new TrackRentedByteResult(connection, _byteTrackingLock);
370+
}
371+
catch
372+
{
373+
_byteTrackingLock.Release();
374+
throw;
375+
}
376+
}
377+
342378
private static void InitIsRunningInCI()
343379
{
344380
bool ci;
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
using System;
2+
using System.Threading;
3+
using RabbitMQ.Client.Framing.Impl;
4+
5+
namespace Test;
6+
7+
public sealed class TrackRentedByteResult : IDisposable
8+
{
9+
private readonly Connection _connection;
10+
private readonly SemaphoreSlim _byteTrackingLock;
11+
12+
internal TrackRentedByteResult(Connection connection, SemaphoreSlim byteTrackingLock)
13+
{
14+
_connection = connection;
15+
_byteTrackingLock = byteTrackingLock;
16+
}
17+
18+
public uint RentedBytes => _connection.RentedBytes;
19+
20+
public void Dispose()
21+
{
22+
_byteTrackingLock.Release();
23+
}
24+
}

0 commit comments

Comments
 (0)