Skip to content

Commit 88809df

Browse files
author
David R. MacIver
committed
missing test file
1 parent 665b957 commit 88809df

File tree

1 file changed

+67
-0
lines changed

1 file changed

+67
-0
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package com.rabbitmq.client.test;
2+
3+
import com.rabbitmq.client.impl.*;
4+
import com.rabbitmq.client.*;
5+
import java.io.IOException;
6+
import java.util.concurrent.atomic.AtomicBoolean;
7+
import java.util.concurrent.CountDownLatch;
8+
9+
import javax.net.SocketFactory;
10+
11+
public class CloseInMainLoop extends BrokerTestCase{
12+
class SpecialConnection extends AMQConnection{
13+
private AtomicBoolean validShutdown = new AtomicBoolean();
14+
15+
public boolean hadValidShutdown(){
16+
if(isOpen()) throw new IllegalStateException("hadValidShutdown called while connection is still open");
17+
return validShutdown.get();
18+
}
19+
20+
public SpecialConnection() throws Exception{
21+
super(new ConnectionParameters(), new SocketFrameHandler(SocketFactory.getDefault(), "localhost", 5672));
22+
this.start(true);
23+
}
24+
25+
@Override
26+
public boolean processControlCommand(Command c) throws IOException{
27+
if(c.getMethod() instanceof AMQP.Connection.CloseOk) validShutdown.set(true);
28+
return super.processControlCommand(c);
29+
}
30+
}
31+
32+
33+
public void testCloseOKNormallyReceived() throws Exception{
34+
SpecialConnection connection = new SpecialConnection();
35+
connection.close();
36+
assertTrue(connection.hadValidShutdown());
37+
}
38+
39+
public void testCloseWithFaultyConsumer() throws Exception{
40+
SpecialConnection connection = new SpecialConnection();
41+
Channel channel = connection.createChannel();
42+
channel.exchangeDeclare("x", "direct");
43+
channel.queueDeclare("q");
44+
channel.queueDelete("q");
45+
channel.queueDeclare("q");
46+
channel.queueBind("q", "x", "k");
47+
48+
final CountDownLatch latch = new CountDownLatch(1);
49+
50+
channel.basicConsume("q", true, new DefaultConsumer(channel){
51+
public void handleDelivery(String consumerTag,
52+
Envelope envelope,
53+
AMQP.BasicProperties properties,
54+
byte[] body){
55+
latch.countDown();
56+
throw new RuntimeException("I am a bad consumer");
57+
}
58+
});
59+
60+
channel.basicPublish("x", "k", null, new byte[10]);
61+
62+
latch.await();
63+
Thread.sleep(200);
64+
assertTrue(connection.hadValidShutdown());
65+
}
66+
67+
}

0 commit comments

Comments
 (0)