Skip to content

Commit 13cec2a

Browse files
author
Rob Harrop
committed
merge with default
2 parents 95cbe95 + e7f9a91 commit 13cec2a

File tree

6 files changed

+149
-115
lines changed

6 files changed

+149
-115
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -412,26 +412,15 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
412412
*/
413413
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
414414

415-
/**
416-
* Purges the contents of the given queue and awaits a completion.
417-
* @see com.rabbitmq.client.AMQP.Queue.Purge
418-
* @see com.rabbitmq.client.AMQP.Queue.PurgeOk
419-
* @param queue the name of the queue
420-
* @return a purge-confirm method if the purge was executed succesfully
421-
* @throws java.io.IOException if an error is encountered
422-
*/
423-
Queue.PurgeOk queuePurge(String queue) throws IOException;
424-
425415
/**
426416
* Purges the contents of the given queue.
427417
* @see com.rabbitmq.client.AMQP.Queue.Purge
428418
* @see com.rabbitmq.client.AMQP.Queue.PurgeOk
429419
* @param queue the name of the queue
430-
* @param nowait whether to await completion of the purge
431420
* @return a purge-confirm method if the purge was executed succesfully
432421
* @throws java.io.IOException if an error is encountered
433422
*/
434-
Queue.PurgeOk queuePurge(String queue, boolean nowait) throws IOException;
423+
Queue.PurgeOk queuePurge(String queue) throws IOException;
435424

436425
/**
437426
* Retrieve a message from a queue using {@link com.rabbitmq.client.AMQP.Basic.Get}
@@ -519,12 +508,13 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
519508
* @param noLocal flag set to true unless server local buffering is required
520509
* @param exclusive true if this is an exclusive consumer
521510
* @param callback an interface to the consumer object
511+
* @param arguments a set of arguments for the consume
522512
* @return the consumerTag associated with the new consumer
523513
* @throws java.io.IOException if an error is encountered
524514
* @see com.rabbitmq.client.AMQP.Basic.Consume
525515
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
526516
*/
527-
String basicConsume(String queue, boolean noAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> filter, Consumer callback) throws IOException;
517+
String basicConsume(String queue, boolean noAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
528518

529519
/**
530520
* Cancel a consumer. Calls the consumer's {@link Consumer#handleCancelOk}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -596,16 +596,9 @@ public Queue.UnbindOk queueUnbind(String queue, String exchange, String routingK
596596
/** Public API - {@inheritDoc} */
597597
public Queue.PurgeOk queuePurge(String queue)
598598
throws IOException
599-
{
600-
return queuePurge(queue, false);
601-
}
602-
603-
/** Public API - {@inheritDoc} */
604-
public Queue.PurgeOk queuePurge(String queue, boolean nowait)
605-
throws IOException
606599
{
607600
return (Queue.PurgeOk)
608-
exnWrappingRpc(new Queue.Purge(TICKET, queue, nowait)).getMethod();
601+
exnWrappingRpc(new Queue.Purge(TICKET, queue, false)).getMethod();
609602
}
610603

611604
/** Public API - {@inheritDoc} */
@@ -677,7 +670,7 @@ public String basicConsume(String queue, boolean noAck, String consumerTag,
677670

678671
/** Public API - {@inheritDoc} */
679672
public String basicConsume(String queue, boolean noAck, String consumerTag,
680-
boolean noLocal, boolean exclusive, Map<String, Object> filter,
673+
boolean noLocal, boolean exclusive, Map<String, Object> arguments,
681674
final Consumer callback)
682675
throws IOException
683676
{
@@ -702,7 +695,7 @@ public String transformReply(AMQCommand replyCommand) {
702695

703696
rpc(new Basic.Consume(TICKET, queue, consumerTag,
704697
noLocal, noAck, exclusive,
705-
false, filter),
698+
false, arguments),
706699
k);
707700

708701
try {

test/src/com/rabbitmq/client/test/functional/QueueLease.java

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class QueueLease extends BrokerTestCase {
4646

4747
// Currently the expiration timer is very responsive but this may
4848
// very well change in the future, so tweak accordingly.
49-
private final static long QUEUE_EXPIRES = 1000L; // msecs
49+
private final static int QUEUE_EXPIRES = 1000; // msecs
5050
private final static int SHOULD_EXPIRE_WITHIN = 2000;
5151

5252
/**
@@ -65,27 +65,42 @@ public void testDoesNotExpireOthers() throws IOException,
6565
verifyQueueExpires(TEST_NORMAL_QUEUE, false);
6666
}
6767

68-
/**
69-
* Verify that the server throws an error if the type of x-expires is not
70-
* long.
71-
*/
72-
public void testExpireMustBeLong() throws IOException {
68+
public void testExpireMayBeByte() throws IOException {
7369
Map<String, Object> args = new HashMap<String, Object>();
74-
args.put("x-expires", 100);
70+
args.put("x-expires", (byte)100);
7571

7672
try {
77-
channel
78-
.queueDeclare("expiresMustBeLong", false, false, false,
79-
args);
80-
fail("server accepted x-expires not of type long");
73+
channel.queueDeclare("expiresMayBeByte", false, true, false, args);
8174
} catch (IOException e) {
82-
checkShutdownSignal(AMQP.PRECONDITION_FAILED, e);
75+
fail("server did not accept x-expires of type byte");
76+
}
77+
}
78+
79+
public void testExpireMayBeShort() throws IOException {
80+
Map<String, Object> args = new HashMap<String, Object>();
81+
args.put("x-expires", (short)100);
82+
83+
try {
84+
channel.queueDeclare("expiresMayBeShort", false, true, false, args);
85+
} catch (IOException e) {
86+
fail("server did not accept x-expires of type short");
87+
}
88+
}
89+
90+
public void testExpireMayBeLong() throws IOException {
91+
Map<String, Object> args = new HashMap<String, Object>();
92+
args.put("x-expires", 100L);
93+
94+
try {
95+
channel.queueDeclare("expiresMayBeLong", false, true, false, args);
96+
} catch (IOException e) {
97+
fail("server did not accept x-expires of type long");
8398
}
8499
}
85100

86101
public void testExpireMustBeGtZero() throws IOException {
87102
Map<String, Object> args = new HashMap<String, Object>();
88-
args.put("x-expires", 0L);
103+
args.put("x-expires", 0);
89104

90105
try {
91106
channel.queueDeclare("expiresMustBeGtZero", false, false, false,
@@ -98,7 +113,7 @@ public void testExpireMustBeGtZero() throws IOException {
98113

99114
public void testExpireMustBePositive() throws IOException {
100115
Map<String, Object> args = new HashMap<String, Object>();
101-
args.put("x-expires", -10L);
116+
args.put("x-expires", -10);
102117

103118
try {
104119
channel.queueDeclare("expiresMustBePositive", false, false, false,
@@ -115,9 +130,9 @@ public void testExpireMustBePositive() throws IOException {
115130
*/
116131
public void testQueueRedeclareEquivalence() throws IOException {
117132
Map<String, Object> args1 = new HashMap<String, Object>();
118-
args1.put("x-expires", 10000L);
133+
args1.put("x-expires", 10000);
119134
Map<String, Object> args2 = new HashMap<String, Object>();
120-
args2.put("x-expires", 20000L);
135+
args2.put("x-expires", 20000);
121136

122137
channel.queueDeclare(TEST_EXPIRE_REDECLARE_QUEUE, false, false, false,
123138
args1);
@@ -145,6 +160,7 @@ void verifyQueueExpires(String name, boolean expire) throws IOException,
145160
try {
146161
channel.queueDeclarePassive(name);
147162
} catch (IOException e) {
163+
checkShutdownSignal(AMQP.NOT_FOUND, e);
148164
fail("Queue expired before deadline.");
149165
}
150166

test/src/com/rabbitmq/client/test/functional/UnexpectedFrames.java

Lines changed: 56 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,53 @@
1313
* Test that the server correctly handles us when we send it bad frames
1414
*/
1515
public class UnexpectedFrames extends BrokerTestCase {
16+
1617
private interface Confuser {
1718
public Frame confuse(Frame frame) throws IOException;
1819
}
1920

20-
@Override protected void setUp() throws IOException {}
21+
private static class ConfusedFrameHandler extends SocketFrameHandler {
22+
23+
private boolean confusedOnce = false;
24+
25+
public ConfusedFrameHandler(Socket socket) throws IOException {
26+
super(socket);
27+
}
28+
29+
@Override
30+
public void writeFrame(Frame frame) throws IOException {
31+
if (confusedOnce) {
32+
super.writeFrame(frame);
33+
} else {
34+
Frame confusedFrame = confuser.confuse(frame);
35+
if (confusedFrame != frame) confusedOnce = true;
36+
if (confusedFrame != null) {
37+
super.writeFrame(confusedFrame);
38+
}
39+
}
40+
}
41+
42+
public Confuser confuser = new Confuser() {
43+
public Frame confuse(Frame frame) {
44+
// Do nothing to start with, we need to negotiate before the
45+
// server will send us unexpected_frame errors
46+
return frame;
47+
}
48+
};
49+
}
2150

22-
@Override protected void tearDown() throws IOException {}
51+
private static class ConfusedConnectionFactory extends ConnectionFactory {
52+
53+
@Override protected FrameHandler createFrameHandler(Socket sock)
54+
throws IOException {
55+
return new ConfusedFrameHandler(sock);
56+
}
57+
}
58+
59+
public UnexpectedFrames() {
60+
super();
61+
connectionFactory = new ConfusedConnectionFactory();
62+
}
2363

2464
public void testMissingHeader() throws IOException {
2565
expectUnexpectedFrameError(new Confuser() {
@@ -38,7 +78,11 @@ public Frame confuse(Frame frame) {
3878
if (frame.type == AMQP.FRAME_METHOD) {
3979
// We can't just skip the method as that will lead us to
4080
// send 0 bytes and hang waiting for a response.
41-
frame.type = AMQP.FRAME_HEADER;
81+
Frame confusedFrame = new Frame(AMQP.FRAME_HEADER,
82+
frame.channel,
83+
frame.payload);
84+
confusedFrame.accumulator = frame.accumulator;
85+
return confusedFrame;
4286
}
4387
return frame;
4488
}
@@ -73,53 +117,16 @@ public Frame confuse(Frame frame) throws IOException {
73117
});
74118
}
75119

76-
private void expectUnexpectedFrameError(Confuser confuser) throws IOException {
77-
ConnectionFactory factory = new ConnectionFactory();
78-
Socket socket = factory.getSocketFactory().createSocket("localhost",
79-
AMQP.PROTOCOL.PORT);
80-
ConfusedFrameHandler handler = new ConfusedFrameHandler(socket);
81-
AMQConnection connection = new AMQConnection(factory, handler);
82-
connection.start();
83-
Channel channel = connection.createChannel();
84-
85-
handler.confuser = confuser;
86-
87-
try {
88-
//NB: the frame confuser relies on the encoding of the
89-
//method field to be at least 8 bytes long
90-
channel.basicPublish("", "routing key", null, "Hello".getBytes());
91-
channel.basicQos(0);
92-
fail("We should have seen an UNEXPECTED_FRAME by now");
93-
}
94-
catch (IOException e) {
95-
checkShutdownSignal(AMQP.UNEXPECTED_FRAME, e);
96-
}
97-
}
120+
private void expectUnexpectedFrameError(Confuser confuser)
121+
throws IOException {
98122

99-
private static class ConfusedFrameHandler extends SocketFrameHandler {
100-
public ConfusedFrameHandler(Socket socket) throws IOException {
101-
super(socket);
102-
}
123+
((ConfusedFrameHandler)((AMQConnection)connection).getFrameHandler()).
124+
confuser = confuser;
103125

104-
@Override
105-
public void writeFrame(Frame frame) throws IOException {
106-
Frame confusedFrame = new Frame();
107-
confusedFrame.accumulator = frame.accumulator;
108-
confusedFrame.channel = frame.channel;
109-
confusedFrame.type = frame.type;
110-
111-
confusedFrame = confuser.confuse(confusedFrame);
112-
if (confusedFrame != null) {
113-
super.writeFrame(confusedFrame);
114-
}
115-
}
116-
117-
public Confuser confuser = new Confuser() {
118-
public Frame confuse(Frame frame) {
119-
// Do nothing to start with, we need to negotiate before the
120-
// server will send us unexpected_frame errors
121-
return frame;
122-
}
123-
};
126+
//NB: the frame confuser relies on the encoding of the
127+
//method field to be at least 8 bytes long
128+
channel.basicPublish("", "routing key", null, "Hello".getBytes());
129+
expectError(AMQP.UNEXPECTED_FRAME);
124130
}
131+
125132
}

0 commit comments

Comments
 (0)