Skip to content

Commit fd95377

Browse files
authored
Merge pull request #1218 from rabbitmq/rabbitmq-dotnet-client-1217
Add ability to specify maximum message size
2 parents d82e383 + 6da0f20 commit fd95377

File tree

5 files changed

+40
-6
lines changed

5 files changed

+40
-6
lines changed

projects/RabbitMQ.Client/client/api/AmqpTcpEndpoint.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public class AmqpTcpEndpoint// : ICloneable
6161
public const int UseDefaultPort = -1;
6262

6363
private int _port;
64+
private uint _maxMessageSize = Constants.DefaultMaxMessageSizeInBytes;
6465

6566
/// <summary>
6667
/// Creates a new instance of the <see cref="AmqpTcpEndpoint"/>.
@@ -177,6 +178,25 @@ public IProtocol Protocol
177178
/// </summary>
178179
public SslOption Ssl { get; set; }
179180

181+
/// <summary>
182+
/// Set the maximum size for a message in bytes. Setting it to 0 reverts to the default of 128MiB
183+
/// </summary>
184+
public uint MaxMessageSize
185+
{
186+
get { return _maxMessageSize; }
187+
set
188+
{
189+
if (value == default(uint))
190+
{
191+
_maxMessageSize = Constants.DefaultMaxMessageSizeInBytes;
192+
}
193+
else
194+
{
195+
_maxMessageSize = value;
196+
}
197+
}
198+
}
199+
180200
/// <summary>
181201
/// Construct an instance from a protocol and an address in "hostname:port" format.
182202
/// </summary>

projects/RabbitMQ.Client/client/framing/Constants.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,5 +81,8 @@ public static class Constants
8181
public const int NotImplemented = 540;
8282
///<summary>(= 541)</summary>
8383
public const int InternalError = 541;
84+
85+
///<summary>(= 134217728)</summary>
86+
public const uint DefaultMaxMessageSizeInBytes = 134217728;
8487
}
8588
}

projects/RabbitMQ.Client/client/impl/Frame.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ private static void ProcessProtocolHeader(Stream reader, ReadOnlySpan<byte> fram
246246
}
247247
}
248248

249-
internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer)
249+
internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer, uint maxMessageSize)
250250
{
251251
try
252252
{
@@ -277,7 +277,11 @@ internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer)
277277
FrameType type = (FrameType)firstByte;
278278
var frameHeaderSpan = new ReadOnlySpan<byte>(frameHeaderBuffer, 1, 6);
279279
int channel = NetworkOrderDeserializer.ReadUInt16(frameHeaderSpan);
280-
int payloadSize = NetworkOrderDeserializer.ReadInt32(frameHeaderSpan.Slice(2, 4)); // FIXME - throw exn on unreasonable value
280+
int payloadSize = NetworkOrderDeserializer.ReadInt32(frameHeaderSpan.Slice(2, 4));
281+
if (payloadSize > maxMessageSize)
282+
{
283+
throw new MalformedFrameException($"Frame payload size '{payloadSize}' exceeds maximum of '{maxMessageSize}' bytes");
284+
}
281285

282286
const int EndMarkerLength = 1;
283287
// Is returned by InboundFrame.ReturnPayload in Connection.MainLoopIteration

projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public static async Task TimeoutAfter(this Task task, TimeSpan timeout)
6565

6666
internal sealed class SocketFrameHandler : IFrameHandler
6767
{
68+
private readonly AmqpTcpEndpoint _amqpTcpEndpoint;
6869
private readonly ITcpClient _socket;
6970
private readonly Stream _reader;
7071
private readonly Stream _writer;
@@ -79,7 +80,7 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
7980
Func<AddressFamily, ITcpClient> socketFactory,
8081
TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
8182
{
82-
Endpoint = endpoint;
83+
_amqpTcpEndpoint = endpoint;
8384
_frameHeaderBuffer = new byte[7];
8485
var channel = Channel.CreateUnbounded<ReadOnlyMemory<byte>>(
8586
new UnboundedChannelOptions
@@ -149,7 +150,11 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
149150
WriteTimeout = writeTimeout;
150151
_writerTask = Task.Run(WriteLoop);
151152
}
152-
public AmqpTcpEndpoint Endpoint { get; set; }
153+
154+
public AmqpTcpEndpoint Endpoint
155+
{
156+
get { return _amqpTcpEndpoint; }
157+
}
153158

154159
public EndPoint LocalEndPoint
155160
{
@@ -235,7 +240,7 @@ public void Close()
235240

236241
public InboundFrame ReadFrame()
237242
{
238-
return InboundFrame.ReadFrom(_reader, _frameHeaderBuffer);
243+
return InboundFrame.ReadFrom(_reader, _frameHeaderBuffer, _amqpTcpEndpoint.MaxMessageSize);
239244
}
240245

241246
public void SendHeader()

projects/Unit/APIApproval.Approve.verified.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"Benchmarks, PublicKey=00240000048000009400000006020000002400005253413100040000010001008d20ec856aeeb8c3153a77faa2d80e6e43b5db93224a20cc7ae384f65f142e89730e2ff0fcc5d578bbe96fa98a7196c77329efdee4579b3814c0789e5a39b51df6edd75b602a33ceabdfcf19a3feb832f31d8254168cd7ba5700dfbca301fbf8db614ba41ba18474de0a5f4c2d51c995bc3636c641c8cbe76f45717bfcb943b5")]
1+
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"Benchmarks, PublicKey=00240000048000009400000006020000002400005253413100040000010001008d20ec856aeeb8c3153a77faa2d80e6e43b5db93224a20cc7ae384f65f142e89730e2ff0fcc5d578bbe96fa98a7196c77329efdee4579b3814c0789e5a39b51df6edd75b602a33ceabdfcf19a3feb832f31d8254168cd7ba5700dfbca301fbf8db614ba41ba18474de0a5f4c2d51c995bc3636c641c8cbe76f45717bfcb943b5")]
22
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"Unit, PublicKey=00240000048000009400000006020000002400005253413100040000010001008d20ec856aeeb8c3153a77faa2d80e6e43b5db93224a20cc7ae384f65f142e89730e2ff0fcc5d578bbe96fa98a7196c77329efdee4579b3814c0789e5a39b51df6edd75b602a33ceabdfcf19a3feb832f31d8254168cd7ba5700dfbca301fbf8db614ba41ba18474de0a5f4c2d51c995bc3636c641c8cbe76f45717bfcb943b5")]
33
namespace RabbitMQ.Client
44
{
@@ -13,6 +13,7 @@ namespace RabbitMQ.Client
1313
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, RabbitMQ.Client.SslOption ssl) { }
1414
public System.Net.Sockets.AddressFamily AddressFamily { get; set; }
1515
public string HostName { get; set; }
16+
public uint MaxMessageSize { get; set; }
1617
public int Port { get; set; }
1718
public RabbitMQ.Client.IProtocol Protocol { get; }
1819
public RabbitMQ.Client.SslOption Ssl { get; set; }
@@ -207,6 +208,7 @@ namespace RabbitMQ.Client
207208
public const int CommandInvalid = 503;
208209
public const int ConnectionForced = 320;
209210
public const int ContentTooLarge = 311;
211+
public const uint DefaultMaxMessageSizeInBytes = 134217728u;
210212
public const int FrameBody = 3;
211213
public const int FrameEnd = 206;
212214
public const int FrameError = 501;

0 commit comments

Comments
 (0)