Skip to content

Commit 3e47e60

Browse files
committed
Add configuration to set the copy threshold
1 parent 1a7dd75 commit 3e47e60

File tree

8 files changed

+51
-20
lines changed

8 files changed

+51
-20
lines changed

projects/RabbitMQ.Client/client/api/ConnectionConfig.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,16 @@ public sealed class ConnectionConfig
143143
/// </summary>
144144
public readonly int DispatchConsumerConcurrency;
145145

146+
/// <summary>
147+
/// The threshold for when to copy the body to a temporary array.
148+
/// </summary>
149+
/// <remarks>
150+
/// When the body is larger than this threshold it will reuse the same buffer. Because of this
151+
/// the buffer cannot be modified by the application. This causes
152+
/// the socket (<see cref="SocketFrameHandler.WriteAsync"/>) to block until the frame is sent.
153+
/// </remarks>
154+
public readonly int CopyBodyToMemoryThreshold;
155+
146156
internal readonly Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> FrameHandlerFactoryAsync;
147157

148158
internal ConnectionConfig(string virtualHost, string userName, string password,
@@ -153,7 +163,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
153163
TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler,
154164
TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout,
155165
bool dispatchConsumersAsync, int dispatchConsumerConcurrency,
156-
Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
166+
Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync, int copyBodyToMemoryThreshold)
157167
{
158168
VirtualHost = virtualHost;
159169
UserName = userName;
@@ -176,6 +186,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
176186
DispatchConsumersAsync = dispatchConsumersAsync;
177187
DispatchConsumerConcurrency = dispatchConsumerConcurrency;
178188
FrameHandlerFactoryAsync = frameHandlerFactoryAsync;
189+
CopyBodyToMemoryThreshold = copyBodyToMemoryThreshold;
179190
}
180191
}
181192
}

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,11 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
138138
/// </summary>
139139
public const string DefaultVHost = "/";
140140

141+
/// <summary>
142+
/// Default value for the copy body to memory threshold.
143+
/// </summary>
144+
public const int DefaultCopyBodyToMemoryThreshold = int.MaxValue;
145+
141146
/// <summary>
142147
/// TLS versions enabled by default: TLSv1.2, v1.1, v1.0.
143148
/// </summary>
@@ -361,6 +366,16 @@ public AmqpTcpEndpoint Endpoint
361366
/// </summary>
362367
public uint MaxMessageSize { get; set; } = DefaultMaxMessageSize;
363368

369+
/// <summary>
370+
/// The threshold for when to copy the body to a temporary array.
371+
/// </summary>
372+
/// <remarks>
373+
/// When the body is larger than this threshold it will reuse the same buffer. Because of this
374+
/// the buffer cannot be modified by the application. This causes
375+
/// the socket (<see cref="SocketFrameHandler.WriteAsync"/>) to block until the frame is sent.
376+
/// </remarks>
377+
public int CopyBodyToMemoryThreshold { get; set; } = DefaultCopyBodyToMemoryThreshold;
378+
364379
/// <summary>
365380
/// The uri to use for the connection.
366381
/// </summary>
@@ -771,7 +786,8 @@ private ConnectionConfig CreateConfig(string clientProvidedName)
771786
RequestedConnectionTimeout,
772787
DispatchConsumersAsync,
773788
ConsumerDispatchConcurrency,
774-
CreateFrameHandlerAsync);
789+
CreateFrameHandlerAsync,
790+
CopyBodyToMemoryThreshold);
775791
}
776792

777793
internal async Task<IFrameHandler> CreateFrameHandlerAsync(

projects/RabbitMQ.Client/client/api/IChannel.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ void BasicPublish<TProperties>(CachedString exchange, CachedString routingKey, i
259259
/// Routing key must be shorter than 255 bytes.
260260
/// </para>
261261
/// </remarks>
262-
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false, bool copyBody = true)
262+
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false, bool? copyBody = null)
263263
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
264264

265265
/// <summary>
@@ -270,7 +270,7 @@ ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in
270270
/// Routing key must be shorter than 255 bytes.
271271
/// </para>
272272
/// </remarks>
273-
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence<byte> body = default, bool mandatory = false, bool copyBody = false)
273+
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence<byte> body = default, bool mandatory = false, bool? copyBody = null)
274274
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
275275

276276
/// <summary>
@@ -281,7 +281,7 @@ ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in
281281
/// Routing key must be shorter than 255 bytes.
282282
/// </para>
283283
/// </remarks>
284-
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false, bool copyBody = true)
284+
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false, bool? copyBody = null)
285285
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
286286

287287
/// <summary>
@@ -292,7 +292,7 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
292292
/// Routing key must be shorter than 255 bytes.
293293
/// </para>
294294
/// </remarks>
295-
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlySequence<byte> body = default, bool mandatory = false, bool copyBody = true)
295+
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlySequence<byte> body = default, bool mandatory = false, bool? copyBody = null)
296296
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
297297

298298
#nullable disable

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -328,19 +328,19 @@ public void BasicPublish<TProperties>(CachedString exchange, CachedString routin
328328
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
329329
=> InnerChannel.BasicPublish(exchange, routingKey, in basicProperties, body, mandatory);
330330

331-
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory, bool copyBody = true)
331+
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory, bool? copyBody = null)
332332
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
333333
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory, copyBody);
334334

335-
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence<byte> body, bool mandatory, bool copyBody = true)
335+
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence<byte> body, bool mandatory, bool? copyBody = null)
336336
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
337337
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory, copyBody);
338338

339-
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory, bool copyBody = true)
339+
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory, bool? copyBody = null)
340340
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
341341
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory, copyBody);
342342

343-
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlySequence<byte> body, bool mandatory, bool copyBody = true)
343+
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlySequence<byte> body, bool mandatory, bool? copyBody = null)
344344
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
345345
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory, copyBody);
346346

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,7 @@ protected void ChannelSend<TMethod, THeader>(in TMethod method, in THeader heade
508508
}
509509

510510
[MethodImpl(MethodImplOptions.AggressiveInlining)]
511-
protected ValueTask ModelSendAsync<TMethod, THeader>(in TMethod method, in THeader header, ReadOnlySequence<byte> body, bool copyBody)
511+
protected ValueTask ModelSendAsync<TMethod, THeader>(in TMethod method, in THeader header, ReadOnlySequence<byte> body, bool? copyBody = null)
512512
where TMethod : struct, IOutgoingAmqpMethod
513513
where THeader : IAmqpHeader
514514
{
@@ -1259,7 +1259,7 @@ public void BasicPublish<TProperties>(CachedString exchange, CachedString routin
12591259
ChannelSend(in cmd, in basicProperties, body);
12601260
}
12611261

1262-
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence<byte> body, bool mandatory, bool copyBody = true)
1262+
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence<byte> body, bool mandatory, bool? copyBody = null)
12631263
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
12641264
{
12651265
if (NextPublishSeqNo > 0)
@@ -1274,13 +1274,13 @@ public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingK
12741274
return ModelSendAsync(in cmd, in basicProperties, body, copyBody);
12751275
}
12761276

1277-
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory, bool copyBody = true)
1277+
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory, bool? copyBody = null)
12781278
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
12791279
{
12801280
return BasicPublishAsync(exchange, routingKey, in basicProperties, new ReadOnlySequence<byte>(body), mandatory, copyBody);
12811281
}
12821282

1283-
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlySequence<byte> body, bool mandatory, bool copyBody = true)
1283+
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlySequence<byte> body, bool mandatory, bool? copyBody = null)
12841284
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
12851285
{
12861286
if (NextPublishSeqNo > 0)
@@ -1295,7 +1295,7 @@ public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedStr
12951295
return ModelSendAsync(in cmd, in basicProperties, body, copyBody);
12961296
}
12971297

1298-
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory, bool copyBody = true)
1298+
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory, bool? copyBody = null)
12991299
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
13001300
{
13011301
return BasicPublishAsync(exchange, routingKey, in basicProperties, new ReadOnlySequence<byte>(body), mandatory, copyBody);

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler)
101101

102102
public IDictionary<string, object?>? ServerProperties { get; private set; }
103103

104+
public int CopyBodyToMemoryThreshold => _config.CopyBodyToMemoryThreshold;
105+
104106
public IEnumerable<ShutdownReportEntry> ShutdownReport => _shutdownReport;
105107
private ShutdownReportEntry[] _shutdownReport = Array.Empty<ShutdownReportEntry>();
106108

projects/RabbitMQ.Client/client/impl/ISession.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,11 @@ void Transmit<TMethod, THeader>(in TMethod cmd, in THeader header, ReadOnlyMemor
9090

9191
ValueTask TransmitAsync<T>(in T cmd) where T : struct, IOutgoingAmqpMethod;
9292

93-
ValueTask TransmitAsync<TMethod, THeader>(in TMethod cmd, in THeader header, ReadOnlySequence<byte> body, bool copyBody = true)
93+
ValueTask TransmitAsync<TMethod, THeader>(in TMethod cmd, in THeader header, ReadOnlySequence<byte> body, bool? copyBody = null)
9494
where TMethod : struct, IOutgoingAmqpMethod
9595
where THeader : IAmqpHeader;
9696

97-
ValueTask TransmitAsync<TMethod, THeader>(in TMethod cmd, in THeader header, ReadOnlyMemory<byte> body, bool copyBody = true)
97+
ValueTask TransmitAsync<TMethod, THeader>(in TMethod cmd, in THeader header, ReadOnlyMemory<byte> body, bool? copyBody = null)
9898
where TMethod : struct, IOutgoingAmqpMethod
9999
where THeader : IAmqpHeader;
100100
}

projects/RabbitMQ.Client/client/impl/SessionBase.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ public void Transmit<TMethod, THeader>(in TMethod cmd, in THeader header, ReadOn
171171
Transmit(cmd, header, new ReadOnlySequence<byte>(body));
172172
}
173173

174-
public ValueTask TransmitAsync<TMethod, THeader>(in TMethod cmd, in THeader header, ReadOnlySequence<byte> body, bool copyBody = true)
174+
public ValueTask TransmitAsync<TMethod, THeader>(in TMethod cmd, in THeader header, ReadOnlySequence<byte> body, bool? copyBody = null)
175175
where TMethod : struct, IOutgoingAmqpMethod
176176
where THeader : IAmqpHeader
177177
{
@@ -180,10 +180,12 @@ public ValueTask TransmitAsync<TMethod, THeader>(in TMethod cmd, in THeader head
180180
ThrowAlreadyClosedException();
181181
}
182182

183-
return Connection.WriteAsync(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, Connection.MaxPayloadSize, copyBody));
183+
copyBody ??= body.Length >= Connection.CopyBodyToMemoryThreshold;
184+
185+
return Connection.WriteAsync(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, Connection.MaxPayloadSize, copyBody.Value));
184186
}
185187

186-
public ValueTask TransmitAsync<TMethod, THeader>(in TMethod cmd, in THeader header, ReadOnlyMemory<byte> body, bool copyBody = true)
188+
public ValueTask TransmitAsync<TMethod, THeader>(in TMethod cmd, in THeader header, ReadOnlyMemory<byte> body, bool? copyBody = null)
187189
where TMethod : struct, IOutgoingAmqpMethod
188190
where THeader : IAmqpHeader
189191
{

0 commit comments

Comments
 (0)