Skip to content

Commit 41f5a80

Browse files
author
Matthew Sackman
committed
Merging bug23357 into default
2 parents ce70078 + f7026a0 commit 41f5a80

File tree

8 files changed

+486
-19
lines changed

8 files changed

+486
-19
lines changed

Makefile

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
VERSION=0.0.0
2-
32
PACKAGE_NAME=rabbitmq-java-client
4-
53
JAVADOC_ARCHIVE=$(PACKAGE_NAME)-javadoc-$(VERSION)
64
SRC_ARCHIVE=$(PACKAGE_NAME)-$(VERSION)
75
SIGNING_KEY=056E8E56

src/com/rabbitmq/client/Channel.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,56 @@ Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable
301301
*/
302302
Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;
303303

304+
/**
305+
* Bind an exchange to an exchange, with no extra arguments.
306+
* @see com.rabbitmq.client.AMQP.Exchange.Bind
307+
* @see com.rabbitmq.client.AMQP.Exchange.BindOk
308+
* @param destination: the name of the exchange to which messages flow across the binding
309+
* @param source: the name of the exchange from which messages flow across the binding
310+
* @param routingKey: the routine key to use for the binding
311+
* @return a binding-confirm method if the binding was successfully created
312+
* @throws java.io.IOException if an error is encountered
313+
*/
314+
Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;
315+
316+
/**
317+
* Bind an exchange to an exchange.
318+
* @see com.rabbitmq.client.AMQP.Exchange.Bind
319+
* @see com.rabbitmq.client.AMQP.Exchange.BindOk
320+
* @param destination: the name of the exchange to which messages flow across the binding
321+
* @param source: the name of the exchange from which messages flow across the binding
322+
* @param routingKey: the routine key to use for the binding
323+
* @param arguments: other properties (binding parameters)
324+
* @return a binding-confirm method if the binding was successfully created
325+
* @throws java.io.IOException if an error is encountered
326+
*/
327+
Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
328+
329+
/**
330+
* Unbind an exchange from an exchange, with no extra arguments.
331+
* @see com.rabbitmq.client.AMQP.Exchange.Bind
332+
* @see com.rabbitmq.client.AMQP.Exchange.BindOk
333+
* @param destination: the name of the exchange to which messages flow across the binding
334+
* @param source: the name of the exchange from which messages flow across the binding
335+
* @param routingKey: the routine key to use for the binding
336+
* @return a binding-confirm method if the binding was successfully created
337+
* @throws java.io.IOException if an error is encountered
338+
*/
339+
Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException;
340+
341+
/**
342+
* Unbind an exchange from an exchange.
343+
* @see com.rabbitmq.client.AMQP.Exchange.Bind
344+
* @see com.rabbitmq.client.AMQP.Exchange.BindOk
345+
* @param destination: the name of the exchange to which messages flow across the binding
346+
* @param source: the name of the exchange from which messages flow across the binding
347+
* @param routingKey: the routine key to use for the binding
348+
* @param arguments: other properties (binding parameters)
349+
* @return a binding-confirm method if the binding was successfully created
350+
* @throws java.io.IOException if an error is encountered
351+
*/
352+
Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
353+
304354
/**
305355
* Actively declare a server-named exclusive, autodelete, non-durable queue.
306356
* The name of the new queue is held in the "queue" field of the {@link com.rabbitmq.client.AMQP.Queue.DeclareOk} result.

src/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -92,32 +92,43 @@ public void handleSignal(ShutdownSignalException signal) {
9292
}
9393
}
9494

95-
public synchronized ChannelN createChannel(AMQConnection connection) throws IOException {
96-
int channelNumber = channelNumberAllocator.allocate();
97-
if (channelNumber == -1) {
98-
return null;
95+
public ChannelN createChannel(AMQConnection connection) throws IOException {
96+
int channelNumber;
97+
synchronized (this) {
98+
channelNumber = channelNumberAllocator.allocate();
99+
if (channelNumber == -1) {
100+
return null;
101+
}
99102
}
100103
return createChannelInternal(connection, channelNumber);
101104
}
102105

103-
public synchronized ChannelN createChannel(AMQConnection connection, int channelNumber) throws IOException {
104-
if(channelNumberAllocator.reserve(channelNumber))
106+
public ChannelN createChannel(AMQConnection connection, int channelNumber) throws IOException {
107+
boolean reserved;
108+
synchronized (this) {
109+
reserved = channelNumberAllocator.reserve(channelNumber);
110+
}
111+
if(reserved)
105112
return createChannelInternal(connection, channelNumber);
106113
else
107114
return null;
108115
}
109116

110-
private synchronized ChannelN createChannelInternal(AMQConnection connection, int channelNumber) throws IOException {
111-
if (_channelMap.containsKey(channelNumber)) {
112-
// That number's already allocated! Can't do it
113-
// This should never happen unless something has gone
114-
// badly wrong with our implementation.
115-
throw new IllegalStateException("We have attempted to "
116-
+ "create a channel with a number that is already in "
117-
+ "use. This should never happen. Please report this as a bug.");
117+
private ChannelN createChannelInternal(AMQConnection connection, int channelNumber) throws IOException {
118+
ChannelN ch;
119+
synchronized (this) {
120+
if (_channelMap.containsKey(channelNumber)) {
121+
// That number's already allocated! Can't do it
122+
// This should never happen unless something has gone
123+
// badly wrong with our implementation.
124+
throw new IllegalStateException("We have attempted to "
125+
+ "create a channel with a number that is already in "
126+
+ "use. This should never happen. "
127+
+ "Please report this as a bug.");
128+
}
129+
ch = new ChannelN(connection, channelNumber);
130+
addChannel(ch);
118131
}
119-
ChannelN ch = new ChannelN(connection, channelNumber);
120-
addChannel(ch);
121132
ch.open(); // now that it's been added to our internal tables
122133
return ch;
123134
}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,36 @@ public Exchange.DeleteOk exchangeDelete(String exchange)
524524
return exchangeDelete(exchange, false);
525525
}
526526

527+
/** Public API - {@inheritDoc} */
528+
public Exchange.BindOk exchangeBind(String destination, String source,
529+
String routingKey, Map<String, Object> arguments)
530+
throws IOException {
531+
return (Exchange.BindOk) exnWrappingRpc(
532+
new Exchange.Bind(TICKET, destination, source, routingKey,
533+
false, arguments)).getMethod();
534+
}
535+
536+
/** Public API - {@inheritDoc} */
537+
public Exchange.BindOk exchangeBind(String destination, String source,
538+
String routingKey) throws IOException {
539+
return exchangeBind(destination, source, routingKey, null);
540+
}
541+
542+
/** Public API - {@inheritDoc} */
543+
public Exchange.UnbindOk exchangeUnbind(String destination, String source,
544+
String routingKey, Map<String, Object> arguments)
545+
throws IOException {
546+
return (Exchange.UnbindOk) exnWrappingRpc(
547+
new Exchange.Unbind(TICKET, destination, source, routingKey,
548+
false, arguments)).getMethod();
549+
}
550+
551+
/** Public API - {@inheritDoc} */
552+
public Exchange.UnbindOk exchangeUnbind(String destination, String source,
553+
String routingKey) throws IOException {
554+
return exchangeUnbind(destination, source, routingKey, null);
555+
}
556+
527557
/** Public API - {@inheritDoc} */
528558
public Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive,
529559
boolean autoDelete, Map<String, Object> arguments)
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2010 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client.test.functional;
33+
34+
import java.io.IOException;
35+
36+
import com.rabbitmq.client.AMQP;
37+
import com.rabbitmq.client.QueueingConsumer;
38+
import com.rabbitmq.client.ShutdownSignalException;
39+
import com.rabbitmq.client.QueueingConsumer.Delivery;
40+
import com.rabbitmq.client.test.BrokerTestCase;
41+
42+
public class ExchangeExchangeBindings extends BrokerTestCase {
43+
44+
private static final int TIMEOUT = 1000;
45+
private static final byte[] MARKER = "MARK".getBytes();
46+
47+
private final String[] queues = new String[] { "q0", "q1", "q2" };
48+
private final String[] exchanges = new String[] { "e0", "e1", "e2" };
49+
private final String[][] bindings = new String[][] { { "q0", "e0" },
50+
{ "q1", "e1" },
51+
{ "q2", "e2" } };
52+
53+
private QueueingConsumer[] consumers = new QueueingConsumer[] { null, null,
54+
null };
55+
56+
protected void publishWithMarker(String x, String rk) throws IOException {
57+
basicPublishVolatile(x, rk);
58+
basicPublishVolatile(MARKER, x, rk);
59+
}
60+
61+
@Override
62+
protected void createResources() throws IOException {
63+
for (String q : queues) {
64+
channel.queueDeclare(q, false, false, false, null);
65+
}
66+
for (String e : exchanges) {
67+
channel.exchangeDeclare(e, "fanout");
68+
}
69+
for (String[] binding : bindings) {
70+
channel.queueBind(binding[0], binding[1], "");
71+
}
72+
for (int idx = 0; idx < consumers.length; ++idx) {
73+
QueueingConsumer consumer = new QueueingConsumer(channel);
74+
channel.basicConsume(queues[idx], true, consumer);
75+
consumers[idx] = consumer;
76+
}
77+
}
78+
79+
@Override
80+
protected void releaseResources() throws IOException {
81+
for (String q : queues) {
82+
channel.queueDelete(q);
83+
}
84+
for (String e : exchanges) {
85+
channel.exchangeDelete(e);
86+
}
87+
}
88+
89+
protected void consumeNoDuplicates(QueueingConsumer consumer)
90+
throws ShutdownSignalException, InterruptedException {
91+
assertNotNull(consumer.nextDelivery(TIMEOUT));
92+
Delivery markerDelivery = consumer.nextDelivery(TIMEOUT);
93+
assertEquals(new String(MARKER), new String(markerDelivery.getBody()));
94+
}
95+
96+
public void testBindingCreationDeletion() throws IOException {
97+
channel.exchangeBind("e2", "e1", "");
98+
channel.exchangeBind("e2", "e1", "");
99+
channel.exchangeUnbind("e2", "e1", "");
100+
try {
101+
channel.exchangeUnbind("e2", "e1", "");
102+
fail("expected not_found in testBindingCreationDeletion");
103+
} catch (IOException e) {
104+
checkShutdownSignal(AMQP.NOT_FOUND, e);
105+
}
106+
}
107+
108+
/* pre (eN --> qN) for N in [0..2]
109+
* test (e0 --> q0)
110+
* add binding (e1 --> e0)
111+
* test (e1 --> {q1, q0})
112+
* add binding (e2 --> e1)
113+
* test (e2 --> {q2, q1, q0})
114+
*/
115+
public void testSimpleChains() throws IOException, ShutdownSignalException,
116+
InterruptedException {
117+
publishWithMarker("e0", "");
118+
consumeNoDuplicates(consumers[0]);
119+
120+
channel.exchangeBind("e0", "e1", "");
121+
publishWithMarker("e1", "");
122+
consumeNoDuplicates(consumers[0]);
123+
consumeNoDuplicates(consumers[1]);
124+
125+
channel.exchangeBind("e1", "e2", "");
126+
publishWithMarker("e2", "");
127+
consumeNoDuplicates(consumers[0]);
128+
consumeNoDuplicates(consumers[1]);
129+
consumeNoDuplicates(consumers[2]);
130+
131+
channel.exchangeUnbind("e0", "e1", "");
132+
channel.exchangeUnbind("e1", "e2", "");
133+
}
134+
135+
/* pre (eN --> qN) for N in [0..2]
136+
* add binding (e0 --> q1)
137+
* test (e0 --> {q0, q1})
138+
* add binding (e1 --> e0)
139+
* resulting in: (e1 --> {q1, e0 --> {q0, q1}})
140+
* test (e1 --> {q0, q1})
141+
*/
142+
public void testDuplicateQueueDestinations() throws IOException,
143+
ShutdownSignalException, InterruptedException {
144+
channel.queueBind("q1", "e0", "");
145+
publishWithMarker("e0", "");
146+
consumeNoDuplicates(consumers[0]);
147+
consumeNoDuplicates(consumers[1]);
148+
149+
channel.exchangeBind("e0", "e1", "");
150+
151+
publishWithMarker("e1", "");
152+
consumeNoDuplicates(consumers[0]);
153+
consumeNoDuplicates(consumers[1]);
154+
155+
channel.exchangeUnbind("e0", "e1", "");
156+
}
157+
158+
/* pre (eN --> qN) for N in [0..2]
159+
* add binding (e1 --> e0)
160+
* add binding (e2 --> e1)
161+
* add binding (e0 --> e2)
162+
* test (eN --> {q0, q1, q2}) for N in [0..2]
163+
*/
164+
public void testExchangeRoutingLoop() throws IOException,
165+
ShutdownSignalException, InterruptedException {
166+
channel.exchangeBind("e0", "e1", "");
167+
channel.exchangeBind("e1", "e2", "");
168+
channel.exchangeBind("e2", "e0", "");
169+
170+
for (String e : exchanges) {
171+
publishWithMarker(e, "");
172+
for (QueueingConsumer c : consumers) {
173+
consumeNoDuplicates(c);
174+
}
175+
}
176+
177+
channel.exchangeUnbind("e0", "e1", "");
178+
channel.exchangeUnbind("e1", "e2", "");
179+
channel.exchangeUnbind("e2", "e0", "");
180+
}
181+
182+
/* pre (eN --> qN) for N in [0..2]
183+
* create topic e and bind e --> eN with rk eN for N in [0..2]
184+
* test publish with rk to e
185+
* create direct ef and bind e --> ef with rk #
186+
* bind ef --> eN with rk eN for N in [0..2]
187+
* test publish with rk to e
188+
* ( end up with: e -(#)-> ef -(eN)-> eN --> qN;
189+
* e -(eN)-> eN for N in [0..2] )
190+
* Then remove the first set of bindings from e --> eN for N in [0..2]
191+
* test publish with rk to e
192+
*/
193+
public void testTopicExchange() throws IOException, ShutdownSignalException,
194+
InterruptedException {
195+
196+
channel.exchangeDeclare("e", "topic");
197+
198+
for (String e : exchanges) {
199+
channel.exchangeBind(e, "e", e);
200+
}
201+
publishAndConsumeAll("e");
202+
203+
channel.exchangeDeclare("ef", "direct");
204+
channel.exchangeBind("ef", "e", "#");
205+
206+
for (String e : exchanges) {
207+
channel.exchangeBind(e, "ef", e);
208+
}
209+
publishAndConsumeAll("e");
210+
211+
for (String e : exchanges) {
212+
channel.exchangeUnbind(e, "e", e);
213+
}
214+
publishAndConsumeAll("e");
215+
216+
channel.exchangeDelete("ef");
217+
channel.exchangeDelete("e");
218+
}
219+
220+
protected void publishAndConsumeAll(String exchange)
221+
throws IOException, ShutdownSignalException, InterruptedException {
222+
223+
for (String e : exchanges) {
224+
publishWithMarker(exchange, e);
225+
}
226+
for (QueueingConsumer c : consumers) {
227+
consumeNoDuplicates(c);
228+
}
229+
}
230+
231+
}

0 commit comments

Comments
 (0)