Skip to content

Add ability to specify maximum message size #1220

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions projects/RabbitMQ.Client/client/api/AmqpTcpEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ public IProtocol Protocol
/// </summary>
public SslOption Ssl { get; set; }

/// <summary>
/// Set the maximum size for a message in bytes. The default value is 0 (unlimited)
/// </summary>
public uint MaxMessageSize { get; set; }

/// <summary>
/// Construct an instance from a protocol and an address in "hostname:port" format.
/// </summary>
Expand Down
8 changes: 6 additions & 2 deletions projects/RabbitMQ.Client/client/impl/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private static void ProcessProtocolHeader(Stream reader)
}
}

internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer, ArrayPool<byte> pool)
internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer, ArrayPool<byte> pool, uint maxMessageSize)
{
int type = default;
try
Expand Down Expand Up @@ -239,7 +239,11 @@ internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer, A

reader.Read(frameHeaderBuffer, 0, frameHeaderBuffer.Length);
int channel = NetworkOrderDeserializer.ReadUInt16(new ReadOnlySpan<byte>(frameHeaderBuffer));
int payloadSize = NetworkOrderDeserializer.ReadInt32(new ReadOnlySpan<byte>(frameHeaderBuffer, 2, 4)); // FIXME - throw exn on unreasonable value
int payloadSize = NetworkOrderDeserializer.ReadInt32(new ReadOnlySpan<byte>(frameHeaderBuffer, 2, 4));
if ((maxMessageSize > 0) && (payloadSize > maxMessageSize))
{
throw new MalformedFrameException($"Frame payload size '{payloadSize}' exceeds maximum of '{maxMessageSize}' bytes");
}

const int EndMarkerLength = 1;
// Is returned by InboundFrame.Dispose in Connection.MainLoopIteration
Expand Down
11 changes: 8 additions & 3 deletions projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public static async Task TimeoutAfter(this Task task, TimeSpan timeout)

class SocketFrameHandler : IFrameHandler
{
private readonly AmqpTcpEndpoint _endpoint;
// Socket poll timeout in ms. If the socket does not
// become writeable in this amount of time, we throw
// an exception.
Expand All @@ -82,7 +83,7 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
Func<AddressFamily, ITcpClient> socketFactory,
TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
{
Endpoint = endpoint;
_endpoint = endpoint;
_frameHeaderBuffer = new byte[6];
var channel = Channel.CreateUnbounded<ReadOnlyMemory<byte>>(
new UnboundedChannelOptions
Expand Down Expand Up @@ -135,7 +136,11 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
WriteTimeout = writeTimeout;
_writerTask = Task.Run(WriteLoop, CancellationToken.None);
}
public AmqpTcpEndpoint Endpoint { get; set; }

public AmqpTcpEndpoint Endpoint
{
get { return _endpoint; }
}

internal ArrayPool<byte> MemoryPool
{
Expand Down Expand Up @@ -229,7 +234,7 @@ public void Close()

public InboundFrame ReadFrame()
{
return InboundFrame.ReadFrom(_reader, _frameHeaderBuffer, MemoryPool);
return InboundFrame.ReadFrom(_reader, _frameHeaderBuffer, MemoryPool, _endpoint.MaxMessageSize);
}

public void SendHeader()
Expand Down
1 change: 1 addition & 0 deletions projects/Unit/APIApproval.Approve.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace RabbitMQ.Client
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, RabbitMQ.Client.SslOption ssl) { }
public System.Net.Sockets.AddressFamily AddressFamily { get; set; }
public string HostName { get; set; }
public uint MaxMessageSize { get; set; }
public int Port { get; set; }
public RabbitMQ.Client.IProtocol Protocol { get; }
public RabbitMQ.Client.SslOption Ssl { get; set; }
Expand Down