Skip to content

Commit ceb9caf

Browse files
committed
Fix loops and IO frame reading
Fixes #11
1 parent 7af1936 commit ceb9caf

File tree

3 files changed

+155
-29
lines changed

3 files changed

+155
-29
lines changed

src/main/java/com/rabbitmq/client/impl/SocketChannelFrameHandler.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ public void flush() throws IOException {
9696

9797
@Override
9898
public void close() {
99-
state.getReadSelectionKey().cancel();
100-
state.getWriteSelectionKey().cancel();
99+
//state.getReadSelectionKey().cancel();
100+
//state.getWriteSelectionKey().cancel();
101101
try {
102102
state.getChannel().close();
103103
} catch (IOException e) {

src/main/java/com/rabbitmq/client/impl/SocketChannelFrameHandlerFactory.java

+100-7
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.rabbitmq.client.AMQP;
1919
import com.rabbitmq.client.Address;
20+
import com.rabbitmq.client.MalformedFrameException;
2021
import com.rabbitmq.client.SocketConfigurator;
2122
import org.slf4j.Logger;
2223
import org.slf4j.LoggerFactory;
@@ -127,9 +128,12 @@ public void run() {
127128
Selector selector = state.selector;
128129
// FIXME find a better default?
129130
ByteBuffer buffer = ByteBuffer.allocate(8192);
130-
131131
try {
132132
while(true) {
133+
int select = selector.select();
134+
135+
// registrations should be done after select,
136+
// once the cancelled keys have been actually removed
133137
RegistrationState registration;
134138
while((registration = state.statesToBeRegistered.poll()) != null) {
135139
int operations = registration.operations;
@@ -139,7 +143,6 @@ public void run() {
139143
registration.done();
140144
}
141145

142-
int select = selector.select();
143146
if (select > 0) {
144147
Set<SelectionKey> readyKeys = selector.selectedKeys();
145148
Iterator<SelectionKey> iterator = readyKeys.iterator();
@@ -150,12 +153,102 @@ public void run() {
150153
SocketChannel channel = (SocketChannel) key.channel();
151154
if (key.isReadable()) {
152155
SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState) key.attachment();
156+
153157
channel.read(buffer);
154158
buffer.flip();
155159
// FIXME handle partial frame
156160
while(buffer.hasRemaining()) {
157-
Frame frame = Frame.readFrom(channel, buffer);
161+
// FIXME make frame read better
162+
int type;
163+
int channelHeader;
164+
165+
if(!buffer.hasRemaining()) {
166+
buffer.clear();
167+
channel.read(buffer);
168+
buffer.flip();
169+
}
170+
171+
type = buffer.get() & 0xff;
172+
173+
if(!buffer.hasRemaining()) {
174+
buffer.clear();
175+
channel.read(buffer);
176+
buffer.flip();
177+
}
178+
179+
int ch1 = buffer.get() & 0xff;
180+
181+
182+
if(!buffer.hasRemaining()) {
183+
buffer.clear();
184+
channel.read(buffer);
185+
buffer.flip();
186+
}
187+
int ch2 = buffer.get() & 0xff;
188+
189+
channelHeader = (ch1 << 8) + (ch2 << 0);
190+
191+
if(!buffer.hasRemaining()) {
192+
buffer.clear();
193+
channel.read(buffer);
194+
buffer.flip();
195+
}
196+
byte b3 = buffer.get();
197+
if(!buffer.hasRemaining()) {
198+
buffer.clear();
199+
channel.read(buffer);
200+
buffer.flip();
201+
}
202+
byte b2 = buffer.get();
203+
if(!buffer.hasRemaining()) {
204+
buffer.clear();
205+
channel.read(buffer);
206+
buffer.flip();
207+
}
208+
byte b1 = buffer.get();
209+
if(!buffer.hasRemaining()) {
210+
buffer.clear();
211+
channel.read(buffer);
212+
buffer.flip();
213+
}
214+
byte b0 = buffer.get();
215+
216+
int payloadSize = (((b3 ) << 24) |
217+
((b2 & 0xff) << 16) |
218+
((b1 & 0xff) << 8) |
219+
((b0 & 0xff) ));
220+
221+
222+
byte[] payload = new byte[payloadSize];
223+
if(payloadSize > buffer.remaining()) {
224+
int remaining = buffer.remaining();
225+
buffer.get(payload, 0, remaining);
226+
buffer.clear();
227+
channel.read(buffer);
228+
buffer.flip();
229+
buffer.get(payload, remaining, payloadSize - remaining);
230+
} else {
231+
buffer.get(payload);
232+
}
233+
234+
if(!buffer.hasRemaining()) {
235+
buffer.clear();
236+
channel.read(buffer);
237+
buffer.flip();
238+
}
239+
int frameEndMarker = buffer.get() & 0xff;
240+
if (frameEndMarker != AMQP.FRAME_END) {
241+
throw new MalformedFrameException("Bad frame end marker: " + frameEndMarker);
242+
}
243+
Frame frame = new Frame(type, channelHeader, payload);
158244
state.addReadFrame(frame);
245+
246+
if(!buffer.hasRemaining()) {
247+
buffer.clear();
248+
channel.read(buffer);
249+
buffer.flip();
250+
}
251+
159252
}
160253
buffer.clear();
161254
}
@@ -165,9 +258,7 @@ public void run() {
165258
} catch(Exception e) {
166259
LOGGER.error("Error in read loop", e);
167260
}
168-
169261
}
170-
171262
}
172263

173264
private static class WriteLoop implements Runnable {
@@ -186,16 +277,18 @@ public void run() {
186277

187278
try {
188279
while(true) {
280+
int select = selector.select();
281+
282+
// registrations should be done after select,
283+
// once the cancelled keys have been actually removed
189284
RegistrationState registration;
190285
while((registration = state.statesToBeRegistered.poll()) != null) {
191286
int operations = registration.operations;
192287
SelectionKey writeKey = registration.state.getChannel().register(selector, operations);
193288
writeKey.attach(registration.state);
194-
registration.state.setWriteSelectionKey(writeKey);
195289
registration.done();
196290
}
197291

198-
int select = selector.select();
199292
if(select > 0) {
200293
Set<SelectionKey> readyKeys = selector.selectedKeys();
201294
Iterator<SelectionKey> iterator = readyKeys.iterator();

src/test/java/com/rabbitmq/client/test/JavaNioTest.java

+53-20
Original file line numberDiff line numberDiff line change
@@ -15,34 +15,67 @@
1515
*/
1616
public class JavaNioTest {
1717

18-
@Test public void connection() throws IOException, TimeoutException, InterruptedException {
18+
@Test
19+
public void connection() throws IOException, TimeoutException, InterruptedException {
20+
CountDownLatch latch = new CountDownLatch(1);
1921
ConnectionFactory connectionFactory = new ConnectionFactory();
2022
connectionFactory.setNio(true);
21-
Connection connection = connectionFactory.newConnection();
23+
Connection connection = null;
2224
try {
23-
Channel channel = connection.createChannel();
24-
String queue = "nio.queue";
25-
channel.queueDeclare(queue, false, false, false, null);
26-
channel.queuePurge(queue);
27-
28-
channel.basicPublish("", queue, null, "hello nio world!".getBytes("UTF-8"));
29-
30-
final CountDownLatch latch = new CountDownLatch(1);
31-
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
32-
@Override
33-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
34-
latch.countDown();
35-
}
36-
});
25+
connection = basicGetBasicConsume(connectionFactory, "nio.queue", latch);
26+
boolean messagesReceived = latch.await(5, TimeUnit.SECONDS);
27+
assertTrue("Message has not been received", messagesReceived);
28+
} finally {
29+
safeClose(connection);
30+
}
31+
}
3732

38-
boolean messageReceived = latch.await(5, TimeUnit.SECONDS);
39-
assertTrue("Message has not been received", messageReceived);
33+
@Test
34+
public void twoConnections() throws IOException, TimeoutException, InterruptedException {
35+
CountDownLatch latch = new CountDownLatch(2);
36+
ConnectionFactory connectionFactory = new ConnectionFactory();
37+
connectionFactory.setNio(true);
38+
Connection connection1 = null;
39+
Connection connection2 = null;
40+
try {
41+
connection1 = basicGetBasicConsume(connectionFactory, "nio.queue.1", latch);
42+
connection2 = basicGetBasicConsume(connectionFactory, "nio.queue.2", latch);
4043

41-
channel.close();
44+
boolean messagesReceived = latch.await(5, TimeUnit.SECONDS);
45+
assertTrue("Messages have not been received", messagesReceived);
4246
} finally {
43-
connection.close();
47+
safeClose(connection1);
48+
safeClose(connection2);
4449
}
50+
}
51+
52+
private Connection basicGetBasicConsume(ConnectionFactory connectionFactory, String queue, final CountDownLatch latch)
53+
throws IOException, TimeoutException {
54+
Connection connection = connectionFactory.newConnection();
55+
Channel channel = connection.createChannel();
56+
channel.queueDeclare(queue, false, false, false, null);
57+
channel.queuePurge(queue);
58+
59+
channel.basicPublish("", queue, null, "hello nio world!".getBytes("UTF-8"));
4560

61+
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
62+
@Override
63+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
64+
latch.countDown();
65+
}
66+
});
67+
68+
return connection;
69+
}
70+
71+
private void safeClose(Connection connection) {
72+
if(connection != null) {
73+
try {
74+
connection.abort();
75+
} catch (Exception e) {
76+
// OK
77+
}
78+
}
4679
}
4780

4881
}

0 commit comments

Comments
 (0)