Skip to content

Commit 1061a50

Browse files
author
Matthias Radestock
committed
merge bug22159 into default
2 parents 0bdf458 + 835095c commit 1061a50

File tree

4 files changed

+143
-27
lines changed

4 files changed

+143
-27
lines changed

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@
3636
import java.util.HashMap;
3737
import java.util.Map;
3838

39+
import java.net.Socket;
40+
import java.net.InetSocketAddress;
41+
3942
import javax.net.SocketFactory;
4043
import javax.net.ssl.SSLContext;
4144
import javax.net.ssl.TrustManager;
@@ -153,7 +156,26 @@ protected FrameHandler createFrameHandler(Address addr)
153156
String hostName = addr.getHost();
154157
int portNumber = addr.getPort();
155158
if (portNumber == -1) portNumber = AMQP.PROTOCOL.PORT;
156-
return new SocketFrameHandler(_factory, hostName, portNumber);
159+
Socket socket = _factory.createSocket();
160+
configureSocket(socket);
161+
socket.connect(new InetSocketAddress(hostName, portNumber));
162+
return new SocketFrameHandler(socket);
163+
}
164+
165+
/**
166+
* Provides a hook to insert custom configuration of the sockets used
167+
* to connect to an AMQP server before they connect.
168+
*
169+
* The default behaviour of this method is to disable Nagle's algorithm to get
170+
* more consistently low latency.
171+
* However it may be overridden freely and there is no requirement to retain
172+
* this behaviour.
173+
*
174+
* @param socket The socket that is to be used for the Connection
175+
*/
176+
protected void configureSocket(Socket socket) throws IOException{
177+
// disable Nagle's algorithm, for more consistently low latency
178+
socket.setTcpNoDelay(true);
157179
}
158180

159181
private Connection newConnection(Address[] addrs,

src/com/rabbitmq/client/impl/SocketFrameHandler.java

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,6 @@
4747
*/
4848

4949
public class SocketFrameHandler implements FrameHandler {
50-
/** Host we connect to */
51-
public final String _host;
52-
53-
/** Port number we connect to */
54-
public final int _port;
55-
5650
/** The underlying socket */
5751
public final Socket _socket;
5852

@@ -62,37 +56,25 @@ public class SocketFrameHandler implements FrameHandler {
6256
/** Socket's outputstream - data to the broker */
6357
public final DataOutputStream _outputStream;
6458

65-
// Note, we use each of these to synchronize on to make sure we don't try to use them
66-
// twice simultaneously.
59+
// Note, we use each of these to synchronize on to make sure we
60+
// don't try to use them twice simultaneously.
6761

6862
/**
69-
* Instantiate a SocketFrameHandler.
70-
* @param factory the socket factory to use to build our Socket - may be SSLSocketFactory etc
71-
* @param hostName the host name
72-
* @param portNumber the port number
73-
* @throws IOException if there is a problem accessing the connection
63+
* @param socket the socket to use
7464
*/
75-
public SocketFrameHandler(SocketFactory factory,
76-
String hostName,
77-
int portNumber)
78-
throws IOException
79-
{
80-
_host = hostName;
81-
_port = portNumber;
82-
_socket = factory.createSocket(_host, _port);
83-
//disable Nagle's algorithm, for more consistently low latency
84-
_socket.setTcpNoDelay(true);
65+
public SocketFrameHandler(Socket socket) throws IOException {
66+
_socket = socket;
8567

8668
_inputStream = new DataInputStream(new BufferedInputStream(_socket.getInputStream()));
8769
_outputStream = new DataOutputStream(new BufferedOutputStream(_socket.getOutputStream()));
8870
}
8971

9072
public String getHost() {
91-
return _host;
73+
return _socket.getInetAddress().getHostName();
9274
}
9375

9476
public int getPort() {
95-
return _port;
77+
return _socket.getPort();
9678
}
9779

9880
public void setTimeout(int timeoutMs)
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package com.rabbitmq.examples;
2+
3+
import com.rabbitmq.client.ConnectionFactory;
4+
import com.rabbitmq.client.Connection;
5+
import com.rabbitmq.client.Channel;
6+
import com.rabbitmq.client.MessageProperties;
7+
import com.rabbitmq.client.Consumer;
8+
import com.rabbitmq.client.QueueingConsumer;
9+
import com.rabbitmq.client.AMQP.Queue;
10+
11+
import java.net.Socket;
12+
import java.io.IOException;
13+
import java.util.Random;
14+
15+
16+
/**
17+
* Class to explore how performance of sending and receiving messages
18+
* varies with the buffer size and enabling/disabling Nagle's
19+
* algorithm.
20+
*/
21+
public class BufferPerformanceMetrics {
22+
23+
public static final int MESSAGE_COUNT = 100000;
24+
public static final byte[] MESSAGE = "".getBytes();
25+
public static final int REPEATS = 1000000;
26+
public static final int PEAK_SIZE = 20 * 1024;
27+
28+
public static double NANOSECONDS_PER_SECOND = 1000 * 1000 * 1000;
29+
30+
public static void main(String[] args) throws Exception {
31+
String hostName = args.length > 0 ? args[0] : "localhost";
32+
33+
Random rnd = new Random();
34+
35+
System.out.println("buffer size, " +
36+
"publish rate with nagle, " +
37+
"consume rate with nagle, " +
38+
"publish rate without nagle, " +
39+
"consume rate without nagle");
40+
41+
for(int repeat = 0; repeat < REPEATS; repeat++) {
42+
final int bufferSize = 1 + rnd.nextInt(PEAK_SIZE);
43+
44+
double
45+
publishRateNagle = 0,
46+
publishRateNoNagle = 0,
47+
consumeRateNagle = 0,
48+
consumeRateNoNagle = 0;
49+
50+
for(final boolean useNagle : new boolean[] { false, true }) {
51+
ConnectionFactory factory = new ConnectionFactory() {
52+
public void configureSocket(Socket socket)
53+
throws IOException {
54+
socket.setTcpNoDelay(!useNagle);
55+
socket.setReceiveBufferSize(bufferSize);
56+
socket.setSendBufferSize(bufferSize);
57+
}
58+
};
59+
60+
Connection connection = factory.newConnection(hostName);
61+
Channel channel = connection.createChannel();
62+
Queue.DeclareOk res = channel.queueDeclare();
63+
String queueName = res.getQueue();
64+
65+
long start;
66+
67+
start = System.nanoTime();
68+
69+
for(int i = 0; i < MESSAGE_COUNT; i++) {
70+
channel.basicPublish("", queueName,
71+
MessageProperties.BASIC, MESSAGE);
72+
}
73+
74+
QueueingConsumer consumer = new QueueingConsumer(channel);
75+
channel.basicConsume(queueName, true, consumer);
76+
77+
long publishTime = System.nanoTime() - start;
78+
79+
start = System.nanoTime();
80+
81+
for(int i = 0; i < MESSAGE_COUNT; i++){
82+
consumer.nextDelivery();
83+
}
84+
85+
long consumeTime = System.nanoTime() - start;
86+
87+
double publishRate =
88+
MESSAGE_COUNT / (publishTime / NANOSECONDS_PER_SECOND);
89+
double consumeRate =
90+
MESSAGE_COUNT / (consumeTime / NANOSECONDS_PER_SECOND);
91+
92+
if(useNagle){
93+
publishRateNagle = publishRate;
94+
consumeRateNagle = consumeRate;
95+
} else {
96+
publishRateNoNagle = publishRate;
97+
consumeRateNoNagle = consumeRate;
98+
}
99+
100+
connection.close();
101+
// Small sleep to remove noise from hammering the server.
102+
Thread.sleep(100);
103+
}
104+
105+
System.out.println(bufferSize + ", " +
106+
publishRateNagle + ", " +
107+
consumeRateNagle + ", " +
108+
publishRateNoNagle + ", " +
109+
consumeRateNoNagle);
110+
}
111+
}
112+
}

test/src/com/rabbitmq/examples/TestMain.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ protected FrameHandler createFrameHandler(Address addr)
101101
String hostName = addr.getHost();
102102
int portNumber = addr.getPort();
103103
if (portNumber == -1) portNumber = AMQP.PROTOCOL.PORT;
104-
return new SocketFrameHandler(getSocketFactory(), hostName, portNumber) {
104+
return new SocketFrameHandler(getSocketFactory().createSocket(hostName, portNumber)) {
105105
public void sendHeader() throws IOException {
106106
sendHeader(protocolMajor, protocolMinor);
107107
}

0 commit comments

Comments
 (0)