Skip to content

Commit 9ed45fd

Browse files
committed
Add max inbound message size to ConnectionFactory
To avoid OOM with a very large message. The default value is 64 MiB. Fixes #1062
1 parent ab0c62d commit 9ed45fd

17 files changed

+282
-48
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

+30-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -203,6 +203,13 @@ public class ConnectionFactory implements Cloneable {
203203

204204
private CredentialsRefreshService credentialsRefreshService;
205205

206+
/**
207+
* Maximum body size of inbound (received) messages in bytes.
208+
*
209+
* <p>Default value is 67,108,864 (64 MiB).
210+
*/
211+
private int maxInboundMessageBodySize = 1_048_576 * 64;
212+
206213
/** @return the default host to use for connections */
207214
public String getHost() {
208215
return host;
@@ -997,11 +1004,15 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IO
9971004
if(this.nioParams.getNioExecutor() == null && this.nioParams.getThreadFactory() == null) {
9981005
this.nioParams.setThreadFactory(getThreadFactory());
9991006
}
1000-
this.frameHandlerFactory = new SocketChannelFrameHandlerFactory(connectionTimeout, nioParams, isSSL(), sslContextFactory);
1007+
this.frameHandlerFactory = new SocketChannelFrameHandlerFactory(
1008+
connectionTimeout, nioParams, isSSL(), sslContextFactory,
1009+
this.maxInboundMessageBodySize);
10011010
}
10021011
return this.frameHandlerFactory;
10031012
} else {
1004-
return new SocketFrameHandlerFactory(connectionTimeout, socketFactory, socketConf, isSSL(), this.shutdownExecutor, sslContextFactory);
1013+
return new SocketFrameHandlerFactory(connectionTimeout, socketFactory,
1014+
socketConf, isSSL(), this.shutdownExecutor, sslContextFactory,
1015+
this.maxInboundMessageBodySize);
10051016
}
10061017

10071018
}
@@ -1300,6 +1311,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
13001311
result.setRecoveredQueueNameSupplier(recoveredQueueNameSupplier);
13011312
result.setTrafficListener(trafficListener);
13021313
result.setCredentialsRefreshService(credentialsRefreshService);
1314+
result.setMaxInboundMessageBodySize(maxInboundMessageBodySize);
13031315
return result;
13041316
}
13051317

@@ -1590,6 +1602,21 @@ public int getChannelRpcTimeout() {
15901602
return channelRpcTimeout;
15911603
}
15921604

1605+
/**
1606+
* Maximum body size of inbound (received) messages in bytes.
1607+
*
1608+
* <p>Default value is 67,108,864 (64 MiB).
1609+
*
1610+
* @param maxInboundMessageBodySize the maximum size of inbound messages
1611+
*/
1612+
public void setMaxInboundMessageBodySize(int maxInboundMessageBodySize) {
1613+
if (maxInboundMessageBodySize <= 0) {
1614+
throw new IllegalArgumentException("Max inbound message body size must be greater than 0: "
1615+
+ maxInboundMessageBodySize);
1616+
}
1617+
this.maxInboundMessageBodySize = maxInboundMessageBodySize;
1618+
}
1619+
15931620
/**
15941621
* The factory to create SSL contexts.
15951622
* This provides more flexibility to create {@link SSLContext}s

src/main/java/com/rabbitmq/client/impl/AMQChannel.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -62,7 +62,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
6262
private final int _channelNumber;
6363

6464
/** Command being assembled */
65-
private AMQCommand _command = new AMQCommand();
65+
private AMQCommand _command;
6666

6767
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
6868
private RpcWrapper _activeRpc = null;
@@ -76,6 +76,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
7676
private final boolean _checkRpcResponseType;
7777

7878
private final TrafficListener _trafficListener;
79+
private final int maxInboundMessageBodySize;
7980

8081
/**
8182
* Construct a channel on the given connection, with the given channel number.
@@ -91,6 +92,8 @@ public AMQChannel(AMQConnection connection, int channelNumber) {
9192
this._rpcTimeout = connection.getChannelRpcTimeout();
9293
this._checkRpcResponseType = connection.willCheckRpcResponseType();
9394
this._trafficListener = connection.getTrafficListener();
95+
this.maxInboundMessageBodySize = connection.getMaxInboundMessageBodySize();
96+
this._command = new AMQCommand(this.maxInboundMessageBodySize);
9497
}
9598

9699
/**
@@ -110,7 +113,7 @@ public int getChannelNumber() {
110113
void handleFrame(Frame frame) throws IOException {
111114
AMQCommand command = _command;
112115
if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
113-
_command = new AMQCommand(); // prepare for the next one
116+
_command = new AMQCommand(this.maxInboundMessageBodySize); // prepare for the next one
114117
handleCompleteInboundCommand(command);
115118
}
116119
}

src/main/java/com/rabbitmq/client/impl/AMQCommand.java

+20-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -44,17 +44,21 @@ public class AMQCommand implements Command {
4444
/** The assembler for this command - synchronised on - contains all the state */
4545
private final CommandAssembler assembler;
4646

47+
AMQCommand(int maxBodyLength) {
48+
this(null, null, null, maxBodyLength);
49+
}
50+
4751
/** Construct a command ready to fill in by reading frames */
4852
public AMQCommand() {
49-
this(null, null, null);
53+
this(null, null, null, Integer.MAX_VALUE);
5054
}
5155

5256
/**
5357
* Construct a command with just a method, and without header or body.
5458
* @param method the wrapped method
5559
*/
5660
public AMQCommand(com.rabbitmq.client.Method method) {
57-
this(method, null, null);
61+
this(method, null, null, Integer.MAX_VALUE);
5862
}
5963

6064
/**
@@ -64,7 +68,19 @@ public AMQCommand(com.rabbitmq.client.Method method) {
6468
* @param body the message body data
6569
*/
6670
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body) {
67-
this.assembler = new CommandAssembler((Method) method, contentHeader, body);
71+
this.assembler = new CommandAssembler((Method) method, contentHeader, body, Integer.MAX_VALUE);
72+
}
73+
74+
/**
75+
* Construct a command with a specified method, header and body.
76+
* @param method the wrapped method
77+
* @param contentHeader the wrapped content header
78+
* @param body the message body data
79+
* @param maxBodyLength the maximum size for an inbound message body
80+
*/
81+
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body,
82+
int maxBodyLength) {
83+
this.assembler = new CommandAssembler((Method) method, contentHeader, body, maxBodyLength);
6884
}
6985

7086
/** Public API - {@inheritDoc} */

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

+7
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ public static Map<String, Object> defaultClientProperties() {
157157
private volatile ChannelManager _channelManager;
158158
/** Saved server properties field from connection.start */
159159
private volatile Map<String, Object> _serverProperties;
160+
private final int maxInboundMessageBodySize;
160161

161162
/**
162163
* Protected API - respond, in the main I/O loop thread, to a ShutdownSignal.
@@ -244,6 +245,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics
244245

245246
this.credentialsRefreshService = params.getCredentialsRefreshService();
246247

248+
247249
this._channel0 = createChannel0();
248250

249251
this._channelManager = null;
@@ -257,6 +259,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics
257259
this.errorOnWriteListener = params.getErrorOnWriteListener() != null ? params.getErrorOnWriteListener() :
258260
(connection, exception) -> { throw exception; }; // we just propagate the exception for non-recoverable connections
259261
this.workPoolTimeout = params.getWorkPoolTimeout();
262+
this.maxInboundMessageBodySize = params.getMaxInboundMessageBodySize();
260263
}
261264

262265
AMQChannel createChannel0() {
@@ -1191,4 +1194,8 @@ public boolean willCheckRpcResponseType() {
11911194
public TrafficListener getTrafficListener() {
11921195
return trafficListener;
11931196
}
1197+
1198+
int getMaxInboundMessageBodySize() {
1199+
return maxInboundMessageBodySize;
1200+
}
11941201
}

src/main/java/com/rabbitmq/client/impl/AbstractFrameHandlerFactory.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
// Copyright (c) 2016-2023 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
116
package com.rabbitmq.client.impl;
217

318
import com.rabbitmq.client.SocketConfigurator;
@@ -10,10 +25,13 @@ public abstract class AbstractFrameHandlerFactory implements FrameHandlerFactory
1025
protected final int connectionTimeout;
1126
protected final SocketConfigurator configurator;
1227
protected final boolean ssl;
28+
protected final int maxInboundMessageBodySize;
1329

14-
protected AbstractFrameHandlerFactory(int connectionTimeout, SocketConfigurator configurator, boolean ssl) {
30+
protected AbstractFrameHandlerFactory(int connectionTimeout, SocketConfigurator configurator,
31+
boolean ssl, int maxInboundMessageBodySize) {
1532
this.connectionTimeout = connectionTimeout;
1633
this.configurator = configurator;
1734
this.ssl = ssl;
35+
this.maxInboundMessageBodySize = maxInboundMessageBodySize;
1836
}
1937
}

src/main/java/com/rabbitmq/client/impl/CommandAssembler.java

+16-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -21,6 +21,7 @@
2121

2222
import com.rabbitmq.client.AMQP;
2323
import com.rabbitmq.client.UnexpectedFrameError;
24+
import static java.lang.String.format;
2425

2526
/**
2627
* Class responsible for piecing together a command from a series of {@link Frame}s.
@@ -52,12 +53,16 @@ private enum CAState {
5253
/** No bytes of content body not yet accumulated */
5354
private long remainingBodyBytes;
5455

55-
public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] body) {
56+
private final int maxBodyLength;
57+
58+
public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] body,
59+
int maxBodyLength) {
5660
this.method = method;
5761
this.contentHeader = contentHeader;
58-
this.bodyN = new ArrayList<byte[]>(2);
62+
this.bodyN = new ArrayList<>(2);
5963
this.bodyLength = 0;
6064
this.remainingBodyBytes = 0;
65+
this.maxBodyLength = maxBodyLength;
6166
appendBodyFragment(body);
6267
if (method == null) {
6368
this.state = CAState.EXPECTING_METHOD;
@@ -99,7 +104,14 @@ private void consumeMethodFrame(Frame f) throws IOException {
99104
private void consumeHeaderFrame(Frame f) throws IOException {
100105
if (f.getType() == AMQP.FRAME_HEADER) {
101106
this.contentHeader = AMQImpl.readContentHeaderFrom(f.getInputStream());
102-
this.remainingBodyBytes = this.contentHeader.getBodySize();
107+
long bodySize = this.contentHeader.getBodySize();
108+
if (bodySize >= this.maxBodyLength) {
109+
throw new IllegalStateException(format(
110+
"Message body is too large (%d), maximum size is %d",
111+
bodySize, this.maxBodyLength
112+
));
113+
}
114+
this.remainingBodyBytes = bodySize;
103115
updateContentBodyState();
104116
} else {
105117
throw new UnexpectedFrameError(f, AMQP.FRAME_HEADER);

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -63,6 +63,8 @@ public class ConnectionParams {
6363

6464
private CredentialsRefreshService credentialsRefreshService;
6565

66+
private int maxInboundMessageBodySize;
67+
6668
public ConnectionParams() {}
6769

6870
public CredentialsProvider getCredentialsProvider() {
@@ -296,4 +298,12 @@ public void setCredentialsRefreshService(CredentialsRefreshService credentialsRe
296298
public CredentialsRefreshService getCredentialsRefreshService() {
297299
return credentialsRefreshService;
298300
}
301+
302+
public int getMaxInboundMessageBodySize() {
303+
return maxInboundMessageBodySize;
304+
}
305+
306+
public void setMaxInboundMessageBodySize(int maxInboundMessageBodySize) {
307+
this.maxInboundMessageBodySize = maxInboundMessageBodySize;
308+
}
299309
}

src/main/java/com/rabbitmq/client/impl/Frame.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -25,6 +25,7 @@
2525
import java.util.Date;
2626
import java.util.List;
2727
import java.util.Map;
28+
import static java.lang.String.format;
2829

2930
/**
3031
* Represents an AMQP wire-protocol frame, with frame type, channel number, and payload bytes.
@@ -81,7 +82,7 @@ public static Frame fromBodyFragment(int channelNumber, byte[] body, int offset,
8182
*
8283
* @return a new Frame if we read a frame successfully, otherwise null
8384
*/
84-
public static Frame readFrom(DataInputStream is) throws IOException {
85+
public static Frame readFrom(DataInputStream is, int maxPayloadSize) throws IOException {
8586
int type;
8687
int channel;
8788

@@ -107,6 +108,12 @@ public static Frame readFrom(DataInputStream is) throws IOException {
107108

108109
channel = is.readUnsignedShort();
109110
int payloadSize = is.readInt();
111+
if (payloadSize >= maxPayloadSize) {
112+
throw new IllegalStateException(format(
113+
"Frame body is too large (%d), maximum size is %d",
114+
payloadSize, maxPayloadSize
115+
));
116+
}
110117
byte[] payload = new byte[payloadSize];
111118
is.readFully(payload);
112119

src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -52,22 +52,26 @@ public class SocketFrameHandler implements FrameHandler {
5252
/** Socket's outputstream - data to the broker - synchronized on */
5353
private final DataOutputStream _outputStream;
5454

55+
private final int maxInboundMessageBodySize;
56+
5557
/** Time to linger before closing the socket forcefully. */
5658
public static final int SOCKET_CLOSING_TIMEOUT = 1;
5759

5860
/**
5961
* @param socket the socket to use
6062
*/
6163
public SocketFrameHandler(Socket socket) throws IOException {
62-
this(socket, null);
64+
this(socket, null, Integer.MAX_VALUE);
6365
}
6466

6567
/**
6668
* @param socket the socket to use
6769
*/
68-
public SocketFrameHandler(Socket socket, ExecutorService shutdownExecutor) throws IOException {
70+
public SocketFrameHandler(Socket socket, ExecutorService shutdownExecutor,
71+
int maxInboundMessageBodySize) throws IOException {
6972
_socket = socket;
7073
_shutdownExecutor = shutdownExecutor;
74+
this.maxInboundMessageBodySize = maxInboundMessageBodySize;
7175

7276
_inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
7377
_outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
@@ -181,7 +185,7 @@ public void initialize(AMQConnection connection) {
181185
@Override
182186
public Frame readFrame() throws IOException {
183187
synchronized (_inputStream) {
184-
return Frame.readFrom(_inputStream);
188+
return Frame.readFrom(_inputStream, this.maxInboundMessageBodySize);
185189
}
186190
}
187191

0 commit comments

Comments
 (0)