Skip to content

Commit f695768

Browse files
committed
Set a limit in write queue
The current limit may be arbitrary right now (depends on the hardware and the scenario). Write as much as possible in the ByteBuffer before clearing it. Fixes #11
1 parent 482449a commit f695768

File tree

4 files changed

+44
-33
lines changed

4 files changed

+44
-33
lines changed

src/main/java/com/rabbitmq/client/impl/Frame.java

+16-12
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,19 @@
1515

1616
package com.rabbitmq.client.impl;
1717

18-
import java.io.ByteArrayInputStream;
19-
import java.io.ByteArrayOutputStream;
20-
import java.io.DataInputStream;
21-
import java.io.DataOutputStream;
22-
import java.io.IOException;
23-
import java.io.UnsupportedEncodingException;
24-
import java.lang.reflect.Field;
18+
import com.rabbitmq.client.AMQP;
19+
import com.rabbitmq.client.LongString;
20+
import com.rabbitmq.client.MalformedFrameException;
21+
22+
import java.io.*;
2523
import java.math.BigDecimal;
2624
import java.net.SocketTimeoutException;
2725
import java.nio.ByteBuffer;
2826
import java.nio.channels.SocketChannel;
2927
import java.sql.Timestamp;
3028
import java.util.Date;
31-
import java.util.Map;
3229
import java.util.List;
33-
34-
import com.rabbitmq.client.AMQP;
35-
import com.rabbitmq.client.LongString;
36-
import com.rabbitmq.client.MalformedFrameException;
30+
import java.util.Map;
3731

3832
/**
3933
* Represents an AMQP wire-protocol frame, with frame type, channel number, and payload bytes.
@@ -52,6 +46,8 @@ public class Frame {
5246
/** Frame payload (for outbound frames) */
5347
private final ByteArrayOutputStream accumulator;
5448

49+
private static final int NON_BODY_SIZE = 1 /* type */ + 2 /* channel */ + 1 /* end character */;
50+
5551
/**
5652
* Constructs a frame for output with a type and a channel number and a
5753
* fresh accumulator waiting for payload.
@@ -243,6 +239,14 @@ public void writeTo(SocketChannel socketChannel, ByteBuffer buffer) throws IOExc
243239
while(buffer.hasRemaining() && socketChannel.write(buffer) != -1);
244240
}
245241

242+
public int size() {
243+
if(accumulator != null) {
244+
return accumulator.size() + NON_BODY_SIZE;
245+
} else {
246+
return payload.length + NON_BODY_SIZE;
247+
}
248+
}
249+
246250
/**
247251
* Public API - retrieves the frame payload
248252
*/

src/main/java/com/rabbitmq/client/impl/SocketChannelFrameHandlerFactory.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.net.InetSocketAddress;
2828
import java.net.SocketAddress;
2929
import java.nio.ByteBuffer;
30-
import java.nio.channels.ClosedChannelException;
3130
import java.nio.channels.SelectionKey;
3231
import java.nio.channels.Selector;
3332
import java.nio.channels.SocketChannel;
@@ -121,7 +120,11 @@ public void run() {
121120
while(true) {
122121
int select;
123122
if(state.statesToBeRegistered.isEmpty()) {
124-
// we can block, registration will Selector.wakeup()
123+
// we can block, registration will call Selector.wakeup()
124+
125+
// FIXME check the number of keys and stop the read and write loops
126+
// if there's no read key anymore
127+
125128
select = selector.select();
126129
} else {
127130
// we cannot block, we need to select and clean cancelled keys before registration
@@ -251,8 +254,8 @@ public void run() {
251254

252255
}
253256

254-
} catch (ClosedChannelException e) {
255-
LOGGER.warn("Error in read loop because of async close");
257+
} catch (Exception e) {
258+
LOGGER.warn("Error during reading frames: "+e.getMessage());
256259
} finally {
257260
buffer.clear();
258261
}
@@ -287,7 +290,7 @@ public void run() {
287290
while(true) {
288291
int select;
289292
if(state.statesToBeRegistered.isEmpty()) {
290-
// we can block, registration will Selector.wakeup()
293+
// we can block, registration will call Selector.wakeup()
291294
select = selector.select();
292295
} else {
293296
// we cannot block, we need to select and clean cancelled keys before registration
@@ -330,9 +333,12 @@ public void run() {
330333

331334
Frame frame;
332335
while((frame = state.getWriteQueue().poll()) != null) {
336+
if(buffer.remaining() < frame.size()) {
337+
buffer.clear();
338+
}
333339
frame.writeTo(channel, buffer);
334-
buffer.clear();
335340
}
341+
buffer.clear();
336342
}
337343
key.cancel();
338344
}

src/main/java/com/rabbitmq/client/impl/SocketChannelFrameHandlerState.java

+14-14
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,25 @@
11
package com.rabbitmq.client.impl;
22

3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
36
import java.nio.channels.SelectionKey;
47
import java.nio.channels.SocketChannel;
58
import java.util.Queue;
9+
import java.util.concurrent.ArrayBlockingQueue;
610
import java.util.concurrent.BlockingQueue;
7-
import java.util.concurrent.LinkedBlockingQueue;
811

912
/**
1013
*
1114
*/
1215
public class SocketChannelFrameHandlerState {
1316

14-
private final SocketChannel channel;
17+
private static final Logger LOGGER = LoggerFactory.getLogger(SocketChannelFrameHandlerState.class);
1518

16-
private final Queue<Frame> writeQueue = new LinkedBlockingQueue<Frame>();
19+
private final SocketChannel channel;
1720

18-
private final BlockingQueue<Frame> readQueue = new LinkedBlockingQueue<Frame>();
21+
// FIXME find appropriate default for limit in write queue
22+
private final BlockingQueue<Frame> writeQueue = new ArrayBlockingQueue<Frame>(1000);
1923

2024
private volatile AMQConnection connection;
2125

@@ -32,14 +36,6 @@ public SocketChannel getChannel() {
3236
return channel;
3337
}
3438

35-
public void addReadFrame(Frame frame) {
36-
this.readQueue.add(frame);
37-
}
38-
39-
public BlockingQueue<Frame> getReadQueue() {
40-
return readQueue;
41-
}
42-
4339
public Queue<Frame> getWriteQueue() {
4440
return writeQueue;
4541
}
@@ -56,8 +52,12 @@ public void setSendHeader(boolean sendHeader) {
5652
}
5753

5854
public void write(Frame frame) {
59-
this.writeQueue.add(frame);
60-
this.selectorState.registerFrameHandlerState(this, SelectionKey.OP_WRITE);
55+
try {
56+
this.writeQueue.put(frame);
57+
this.selectorState.registerFrameHandlerState(this, SelectionKey.OP_WRITE);
58+
} catch (InterruptedException e) {
59+
LOGGER.warn("Thread interrupted during enqueuing frame in write queue");
60+
}
6161
}
6262

6363
public AMQConnection getConnection() {

src/test/java/com/rabbitmq/client/test/JavaNioTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,10 @@ private Connection basicGetBasicConsume(ConnectionFactory connectionFactory, Str
5858

5959
channel.basicPublish("", queue, null, "hello nio world!".getBytes("UTF-8"));
6060

61-
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
61+
channel.basicConsume(queue, false, new DefaultConsumer(channel) {
6262
@Override
6363
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
64+
getChannel().basicAck(envelope.getDeliveryTag(), false);
6465
latch.countDown();
6566
}
6667
});

0 commit comments

Comments
 (0)