Skip to content

Commit a82d39d

Browse files
author
Simon MacMullen
committed
Merge bug23896
2 parents 17a4986 + 9637a39 commit a82d39d

File tree

6 files changed

+130
-122
lines changed

6 files changed

+130
-122
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ public interface Channel extends ShutdownNotifier {
231231
void basicQos(int prefetchCount) throws IOException;
232232

233233
/**
234-
* Publish a message with both "mandatory" and "immediate" flags set to false
234+
* Publish a message
235235
* @see com.rabbitmq.client.AMQP.Basic.Publish
236236
* @param exchange the exchange to publish the message to
237237
* @param routingKey the routing key
@@ -246,8 +246,22 @@ public interface Channel extends ShutdownNotifier {
246246
* @see com.rabbitmq.client.AMQP.Basic.Publish
247247
* @param exchange the exchange to publish the message to
248248
* @param routingKey the routing key
249-
* @param mandatory true if we are requesting a mandatory publish
250-
* @param immediate true if we are requesting an immediate publish
249+
* @param mandatory true if the 'mandatory' flag is to be set
250+
* @param props other properties for the message - routing headers etc
251+
* @param body the message body
252+
* @throws java.io.IOException if an error is encountered
253+
*/
254+
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
255+
throws IOException;
256+
257+
/**
258+
* Publish a message
259+
* @see com.rabbitmq.client.AMQP.Basic.Publish
260+
* @param exchange the exchange to publish the message to
261+
* @param routingKey the routing key
262+
* @param mandatory true if the 'mandatory' flag is to be set
263+
* @param immediate true if the 'immediate' flag is to be
264+
* set. Note that the RabbitMQ server does not support this flag.
251265
* @param props other properties for the message - routing headers etc
252266
* @param body the message body
253267
* @throws java.io.IOException if an error is encountered

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,16 @@ public void basicPublish(String exchange, String routingKey,
605605
BasicProperties props, byte[] body)
606606
throws IOException
607607
{
608-
basicPublish(exchange, routingKey, false, false, props, body);
608+
basicPublish(exchange, routingKey, false, props, body);
609+
}
610+
611+
/** Public API - {@inheritDoc} */
612+
public void basicPublish(String exchange, String routingKey,
613+
boolean mandatory,
614+
BasicProperties props, byte[] body)
615+
throws IOException
616+
{
617+
basicPublish(exchange, routingKey, mandatory, false, props, body);
609618
}
610619

611620
/** Public API - {@inheritDoc} */

test/src/com/rabbitmq/client/test/functional/AlternateExchange.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -139,36 +139,35 @@ protected void checkGet(boolean[] expected) throws IOException {
139139
*
140140
* @param key the routing key of the message to be sent
141141
* @param mandatory whether the message should be marked as 'mandatory'
142-
* @param immediate whether the message should be marked as 'immediate'
143142
* @param expected indicates which queues we expect the message to
144143
* get routed to
145144
* @param ret whether a 'basic.return' is expected
146145
*
147146
* @see #checkGet(boolean[])
148147
*/
149-
protected void check(String key, boolean mandatory, boolean immediate,
150-
boolean[] expected, boolean ret)
148+
protected void check(String key, boolean mandatory, boolean[] expected,
149+
boolean ret)
151150
throws IOException {
152151

153152
gotReturn.set(false);
154-
channel.basicPublish("x", key, mandatory, immediate, null,
153+
channel.basicPublish("x", key, mandatory, false, null,
155154
"ae-test".getBytes());
156155
checkGet(expected);
157156
assertEquals(ret, gotReturn.get());
158157
}
159158

160159
protected void check(String key, boolean[] expected, boolean ret)
161160
throws IOException {
162-
check(key, false, false, expected, ret);
161+
check(key, false, expected, ret);
163162
}
164163

165-
protected void check(String key, boolean mandatory, boolean immediate,
166-
boolean ret) throws IOException {
167-
check(key, mandatory, immediate, expected(key), ret);
164+
protected void check(String key, boolean mandatory, boolean ret)
165+
throws IOException {
166+
check(key, mandatory, expected(key), ret);
168167
}
169168

170169
protected void check(String key, boolean ret) throws IOException {
171-
check(key, false, false, ret);
170+
check(key, false, ret);
172171
}
173172

174173
/**
@@ -198,9 +197,7 @@ public void testAe() throws IOException {
198197
//ordinary
199198
check(k, false);
200199
//mandatory
201-
check(k, true, false, k.equals("z"));
202-
//immediate
203-
check(k, false, true, unrouted, true);
200+
check(k, true, k.equals("z"));
204201
}
205202

206203
cleanup();

test/src/com/rabbitmq/client/test/functional/Confirm.java

Lines changed: 20 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -60,45 +60,34 @@ protected void setUp() throws IOException {
6060
"confirm-multiple-queues");
6161
}
6262

63-
public void testPersistentMandatoryImmediateCombinations()
63+
public void testPersistentMandatoryCombinations()
6464
throws IOException, InterruptedException
6565
{
6666
boolean b[] = { false, true };
6767
for (boolean persistent : b) {
6868
for (boolean mandatory : b) {
69-
for (boolean immediate : b) {
70-
confirmTest("", "confirm-test",
71-
persistent, mandatory, immediate);
72-
}
69+
confirmTest("", "confirm-test", persistent, mandatory);
7370
}
7471
}
7572
}
7673

7774
public void testNonDurable()
7875
throws IOException, InterruptedException
7976
{
80-
confirmTest("", "confirm-test-nondurable", true, false, false);
81-
}
82-
83-
public void testImmediateNoConsumer()
84-
throws IOException, InterruptedException
85-
{
86-
confirmTest("", "confirm-test-noconsumer", false, false, true);
87-
confirmTest("", "confirm-test-noconsumer", true, false, true);
77+
confirmTest("", "confirm-test-nondurable", true, false);
8878
}
8979

9080
public void testMandatoryNoRoute()
9181
throws IOException, InterruptedException
9282
{
93-
confirmTest("", "confirm-test-doesnotexist", false, true, false);
94-
confirmTest("", "confirm-test-doesnotexist", true, true, false);
83+
confirmTest("", "confirm-test-doesnotexist", false, true);
84+
confirmTest("", "confirm-test-doesnotexist", true, true);
9585
}
9686

9787
public void testMultipleQueues()
9888
throws IOException, InterruptedException
9989
{
100-
confirmTest("amq.direct", "confirm-multiple-queues",
101-
true, false, false);
90+
confirmTest("amq.direct", "confirm-multiple-queues", true, false);
10291
}
10392

10493
/* For testQueueDelete and testQueuePurge to be
@@ -109,7 +98,7 @@ public void testMultipleQueues()
10998
public void testQueueDelete()
11099
throws IOException, InterruptedException
111100
{
112-
publishN("","confirm-test-noconsumer", true, false, false);
101+
publishN("","confirm-test-noconsumer", true, false);
113102

114103
channel.queueDelete("confirm-test-noconsumer");
115104

@@ -119,7 +108,7 @@ public void testQueueDelete()
119108
public void testQueuePurge()
120109
throws IOException, InterruptedException
121110
{
122-
publishN("", "confirm-test-noconsumer", true, false, false);
111+
publishN("", "confirm-test-noconsumer", true, false);
123112

124113
channel.queuePurge("confirm-test-noconsumer");
125114

@@ -142,7 +131,7 @@ public void testQueueTTL()
142131
Collections.singletonMap(TTL_ARG, (Object)ttl);
143132
channel.queueDeclare("confirm-ttl", true, true, false, argMap);
144133

145-
publishN("", "confirm-ttl", true, false, false);
134+
publishN("", "confirm-ttl", true, false);
146135
channel.waitForConfirmsOrDie();
147136

148137
channel.queueDelete("confirm-ttl");
@@ -166,7 +155,7 @@ public void testBasicRejectRequeue()
166155
public void testBasicRecover()
167156
throws IOException, InterruptedException
168157
{
169-
publishN("", "confirm-test-noconsumer", true, false, false);
158+
publishN("", "confirm-test-noconsumer", true, false);
170159

171160
for (long i = 0; i < NUM_MESSAGES; i++) {
172161
GetResponse resp =
@@ -231,7 +220,7 @@ public void handleNack(long seqNo, boolean multiple) {
231220

232221
for (long i = 0; i < NUM_MESSAGES; i++) {
233222
unconfirmedSet.add(channel.getNextPublishSeqNo());
234-
publish("", "confirm-test", true, false, false);
223+
publish("", "confirm-test", true, false);
235224
}
236225

237226
channel.waitForConfirmsOrDie();
@@ -245,14 +234,14 @@ public void testWaitForConfirmsNoOp()
245234
{
246235
channel = connection.createChannel();
247236
// Don't enable Confirm mode
248-
publish("", "confirm-test", true, false, false);
237+
publish("", "confirm-test", true, false);
249238
channel.waitForConfirmsOrDie(); // Nop
250239
}
251240

252241
public void testWaitForConfirmsException()
253242
throws IOException, InterruptedException
254243
{
255-
publishN("", "confirm-test", true, false, false);
244+
publishN("", "confirm-test", true, false);
256245
channel.close();
257246
try {
258247
channel.waitForConfirmsOrDie();
@@ -268,29 +257,27 @@ public void testWaitForConfirmsException()
268257

269258
/* Publish NUM_MESSAGES messages and wait for confirmations. */
270259
public void confirmTest(String exchange, String queueName,
271-
boolean persistent, boolean mandatory,
272-
boolean immediate)
260+
boolean persistent, boolean mandatory)
273261
throws IOException, InterruptedException
274262
{
275-
publishN(exchange, queueName, persistent, mandatory, immediate);
263+
publishN(exchange, queueName, persistent, mandatory);
276264

277265
channel.waitForConfirmsOrDie();
278266
}
279267

280268
private void publishN(String exchangeName, String queueName,
281-
boolean persistent, boolean mandatory,
282-
boolean immediate)
269+
boolean persistent, boolean mandatory)
283270
throws IOException
284271
{
285272
for (long i = 0; i < NUM_MESSAGES; i++) {
286-
publish(exchangeName, queueName, persistent, mandatory, immediate);
273+
publish(exchangeName, queueName, persistent, mandatory);
287274
}
288275
}
289276

290277
private void basicRejectCommon(boolean requeue)
291278
throws IOException
292279
{
293-
publishN("", "confirm-test-noconsumer", true, false, false);
280+
publishN("", "confirm-test-noconsumer", true, false);
294281

295282
for (long i = 0; i < NUM_MESSAGES; i++) {
296283
GetResponse resp =
@@ -301,10 +288,9 @@ private void basicRejectCommon(boolean requeue)
301288
}
302289

303290
protected void publish(String exchangeName, String queueName,
304-
boolean persistent, boolean mandatory,
305-
boolean immediate)
291+
boolean persistent, boolean mandatory)
306292
throws IOException {
307-
channel.basicPublish(exchangeName, queueName, mandatory, immediate,
293+
channel.basicPublish(exchangeName, queueName, mandatory, false,
308294
persistent ? MessageProperties.PERSISTENT_BASIC
309295
: MessageProperties.BASIC,
310296
"nop".getBytes());

test/src/com/rabbitmq/client/test/functional/Routing.java

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@
2020
import com.rabbitmq.client.test.BrokerTestCase;
2121
import com.rabbitmq.client.AMQP;
2222
import com.rabbitmq.client.GetResponse;
23+
import com.rabbitmq.client.ReturnListener;
24+
import com.rabbitmq.utility.BlockingCell;
2325

2426
import java.io.IOException;
2527
import java.util.List;
2628
import java.util.ArrayList;
2729
import java.util.HashMap;
2830
import java.util.Map;
31+
import java.util.concurrent.TimeoutException;
2932

3033
public class Routing extends BrokerTestCase
3134
{
@@ -34,6 +37,8 @@ public class Routing extends BrokerTestCase
3437
protected final String Q1 = "foo";
3538
protected final String Q2 = "bar";
3639

40+
private volatile BlockingCell<Integer> returnCell;
41+
3742
protected void createResources() throws IOException {
3843
channel.exchangeDeclare(E, "direct");
3944
channel.queueDeclare(Q1, false, false, false, null);
@@ -228,5 +233,73 @@ public void testHeadersRouting() throws Exception {
228233
checkGet(Q2, false);
229234
}
230235

231-
}
236+
public void testBasicReturn() throws IOException {
237+
channel.addReturnListener(makeReturnListener());
238+
returnCell = new BlockingCell<Integer>();
239+
240+
//returned 'mandatory' publish
241+
channel.basicPublish("", "unknown", true, false, null, "mandatory1".getBytes());
242+
checkReturn(AMQP.NO_ROUTE);
243+
244+
//routed 'mandatory' publish
245+
channel.basicPublish("", Q1, true, false, null, "mandatory2".getBytes());
246+
assertNotNull(channel.basicGet(Q1, true));
247+
248+
//'immediate' publish
249+
channel.basicPublish("", Q1, false, true, null, "immediate".getBytes());
250+
try {
251+
channel.basicQos(0); //flush
252+
fail("basic.publish{immediate=true} should not be supported");
253+
} catch (IOException ioe) {
254+
checkShutdownSignal(AMQP.NOT_IMPLEMENTED, ioe);
255+
}
256+
}
232257

258+
public void testBasicReturnTransactional() throws IOException {
259+
channel.txSelect();
260+
channel.addReturnListener(makeReturnListener());
261+
returnCell = new BlockingCell<Integer>();
262+
263+
//returned 'mandatory' publish
264+
channel.basicPublish("", "unknown", true, false, null, "mandatory1".getBytes());
265+
try {
266+
returnCell.uninterruptibleGet(200);
267+
fail("basic.return issued prior to tx.commit");
268+
} catch (TimeoutException toe) {}
269+
channel.txCommit();
270+
checkReturn(AMQP.NO_ROUTE);
271+
272+
//routed 'mandatory' publish
273+
channel.basicPublish("", Q1, true, false, null, "mandatory2".getBytes());
274+
channel.txCommit();
275+
assertNotNull(channel.basicGet(Q1, true));
276+
277+
//returned 'mandatory' publish when message is routable on
278+
//publish but not on commit
279+
channel.basicPublish("", Q1, true, false, null, "mandatory2".getBytes());
280+
channel.queueDelete(Q1);
281+
channel.txCommit();
282+
checkReturn(AMQP.NO_ROUTE);
283+
channel.queueDeclare(Q1, false, false, false, null);
284+
}
285+
286+
protected ReturnListener makeReturnListener() {
287+
return new ReturnListener() {
288+
public void handleReturn(int replyCode,
289+
String replyText,
290+
String exchange,
291+
String routingKey,
292+
AMQP.BasicProperties properties,
293+
byte[] body)
294+
throws IOException {
295+
Routing.this.returnCell.set(replyCode);
296+
}
297+
};
298+
}
299+
300+
protected void checkReturn(int replyCode) {
301+
assertEquals((int)returnCell.uninterruptibleGet(), AMQP.NO_ROUTE);
302+
returnCell = new BlockingCell<Integer>();
303+
}
304+
305+
}

0 commit comments

Comments
 (0)