Skip to content

Commit c6f5279

Browse files
Merge pull request #244 from rabbitmq/rabbitmq-java-client-221
Remove QueueingConsumer from RpcServer
2 parents e1bc4cd + 498bbb0 commit c6f5279

File tree

4 files changed

+254
-9
lines changed

4 files changed

+254
-9
lines changed

src/main/java/com/rabbitmq/client/RpcServer.java

+150-8
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616

1717
package com.rabbitmq.client;
1818

19+
import com.rabbitmq.utility.Utility;
20+
1921
import java.io.IOException;
22+
import java.util.concurrent.BlockingQueue;
23+
import java.util.concurrent.LinkedBlockingQueue;
2024

2125
/**
2226
* Class which manages a request queue for a simple RPC-style service.
@@ -28,10 +32,10 @@ public class RpcServer {
2832
/** Queue to receive requests from */
2933
private final String _queueName;
3034
/** Boolean controlling the exit from the mainloop. */
31-
private boolean _mainloopRunning = true;
35+
private volatile boolean _mainloopRunning = true;
3236

3337
/** Consumer attached to our request queue */
34-
private QueueingConsumer _consumer;
38+
private RpcConsumer _consumer;
3539

3640
/**
3741
* Creates an RpcServer listening on a temporary exclusive
@@ -80,10 +84,10 @@ public void close()
8084
* @throws IOException if an error is encountered
8185
* @return the newly created and registered consumer
8286
*/
83-
protected QueueingConsumer setupConsumer()
87+
protected RpcConsumer setupConsumer()
8488
throws IOException
8589
{
86-
QueueingConsumer consumer = new QueueingConsumer(_channel);
90+
RpcConsumer consumer = new DefaultRpcConsumer(_channel);
8791
_channel.basicConsume(_queueName, consumer);
8892
return consumer;
8993
}
@@ -106,7 +110,7 @@ public ShutdownSignalException mainloop()
106110
{
107111
try {
108112
while (_mainloopRunning) {
109-
QueueingConsumer.Delivery request;
113+
Delivery request;
110114
try {
111115
request = _consumer.nextDelivery();
112116
} catch (InterruptedException ie) {
@@ -136,7 +140,7 @@ public void terminateMainloop() {
136140
/**
137141
* Private API - Process a single request. Called from mainloop().
138142
*/
139-
public void processRequest(QueueingConsumer.Delivery request)
143+
public void processRequest(Delivery request)
140144
throws IOException
141145
{
142146
AMQP.BasicProperties requestProperties = request.getProperties();
@@ -157,7 +161,7 @@ public void processRequest(QueueingConsumer.Delivery request)
157161
* Lowest-level response method. Calls
158162
* handleCall(AMQP.BasicProperties,byte[],AMQP.BasicProperties).
159163
*/
160-
public byte[] handleCall(QueueingConsumer.Delivery request,
164+
public byte[] handleCall(Delivery request,
161165
AMQP.BasicProperties replyProperties)
162166
{
163167
return handleCall(request.getProperties(),
@@ -191,7 +195,7 @@ public byte[] handleCall(byte[] requestBody,
191195
* Lowest-level handler method. Calls
192196
* handleCast(AMQP.BasicProperties,byte[]).
193197
*/
194-
public void handleCast(QueueingConsumer.Delivery request)
198+
public void handleCast(Delivery request)
195199
{
196200
handleCast(request.getProperties(), request.getBody());
197201
}
@@ -230,5 +234,143 @@ public Channel getChannel() {
230234
public String getQueueName() {
231235
return _queueName;
232236
}
237+
238+
/**
239+
* Encapsulates an arbitrary message - simple "bean" holder structure.
240+
*/
241+
public static class Delivery {
242+
private final Envelope _envelope;
243+
private final AMQP.BasicProperties _properties;
244+
private final byte[] _body;
245+
246+
public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
247+
_envelope = envelope;
248+
_properties = properties;
249+
_body = body;
250+
}
251+
252+
/**
253+
* Retrieve the message envelope.
254+
* @return the message envelope
255+
*/
256+
public Envelope getEnvelope() {
257+
return _envelope;
258+
}
259+
260+
/**
261+
* Retrieve the message properties.
262+
* @return the message properties
263+
*/
264+
public AMQP.BasicProperties getProperties() {
265+
return _properties;
266+
}
267+
268+
/**
269+
* Retrieve the message body.
270+
* @return the message body
271+
*/
272+
public byte[] getBody() {
273+
return _body;
274+
}
275+
}
276+
277+
public interface RpcConsumer extends Consumer {
278+
279+
Delivery nextDelivery() throws InterruptedException, ShutdownSignalException, ConsumerCancelledException;
280+
281+
String getConsumerTag();
282+
283+
}
284+
285+
private static class DefaultRpcConsumer extends DefaultConsumer implements RpcConsumer {
286+
287+
// Marker object used to signal the queue is in shutdown mode.
288+
// It is only there to wake up consumers. The canonical representation
289+
// of shutting down is the presence of _shutdown.
290+
// Invariant: This is never on _queue unless _shutdown != null.
291+
private static final Delivery POISON = new Delivery(null, null, null);
292+
private final BlockingQueue<Delivery> _queue;
293+
// When this is non-null the queue is in shutdown mode and nextDelivery should
294+
// throw a shutdown signal exception.
295+
private volatile ShutdownSignalException _shutdown;
296+
private volatile ConsumerCancelledException _cancelled;
297+
298+
public DefaultRpcConsumer(Channel ch) {
299+
this(ch, new LinkedBlockingQueue<>());
300+
}
301+
302+
public DefaultRpcConsumer(Channel ch, BlockingQueue<Delivery> q) {
303+
super(ch);
304+
this._queue = q;
305+
}
306+
307+
@Override
308+
public Delivery nextDelivery() throws InterruptedException, ShutdownSignalException, ConsumerCancelledException {
309+
return handle(_queue.take());
310+
}
311+
312+
@Override
313+
public void handleShutdownSignal(String consumerTag,
314+
ShutdownSignalException sig) {
315+
_shutdown = sig;
316+
_queue.add(POISON);
317+
}
318+
319+
@Override
320+
public void handleCancel(String consumerTag) throws IOException {
321+
_cancelled = new ConsumerCancelledException();
322+
_queue.add(POISON);
323+
}
324+
325+
@Override
326+
public void handleDelivery(String consumerTag,
327+
Envelope envelope,
328+
AMQP.BasicProperties properties,
329+
byte[] body)
330+
throws IOException {
331+
checkShutdown();
332+
this._queue.add(new Delivery(envelope, properties, body));
333+
}
334+
335+
/**
336+
* Check if we are in shutdown mode and if so throw an exception.
337+
*/
338+
private void checkShutdown() {
339+
if (_shutdown != null)
340+
throw Utility.fixStackTrace(_shutdown);
341+
}
342+
343+
/**
344+
* If delivery is not POISON nor null, return it.
345+
* <p/>
346+
* If delivery, _shutdown and _cancelled are all null, return null.
347+
* <p/>
348+
* If delivery is POISON re-insert POISON into the queue and
349+
* throw an exception if POISONed for no reason.
350+
* <p/>
351+
* Otherwise, if we are in shutdown mode or cancelled,
352+
* throw a corresponding exception.
353+
*/
354+
private Delivery handle(Delivery delivery) {
355+
if (delivery == POISON ||
356+
delivery == null && (_shutdown != null || _cancelled != null)) {
357+
if (delivery == POISON) {
358+
_queue.add(POISON);
359+
if (_shutdown == null && _cancelled == null) {
360+
throw new IllegalStateException(
361+
"POISON in queue, but null _shutdown and null _cancelled. " +
362+
"This should never happen, please report as a BUG");
363+
}
364+
}
365+
if (null != _shutdown)
366+
throw Utility.fixStackTrace(_shutdown);
367+
if (null != _cancelled)
368+
throw Utility.fixStackTrace(_cancelled);
369+
}
370+
return delivery;
371+
}
372+
}
373+
374+
233375
}
234376

src/test/java/com/rabbitmq/client/test/ClientTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@
4545
DnsRecordIpAddressResolverTests.class,
4646
StandardMetricsCollectorTest.class,
4747
DnsSrvRecordAddressResolverTest.class,
48-
JavaNioTest.class
48+
JavaNioTest.class,
49+
RpcTest.class
4950
})
5051
public class ClientTests {
5152

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
17+
package com.rabbitmq.client.test;
18+
19+
import com.rabbitmq.client.*;
20+
import org.junit.After;
21+
import org.junit.Assert;
22+
import org.junit.Before;
23+
import org.junit.Test;
24+
25+
import java.io.IOException;
26+
import java.util.concurrent.TimeoutException;
27+
28+
import static org.junit.Assert.assertEquals;
29+
30+
public class RpcTest {
31+
32+
Connection clientConnection, serverConnection;
33+
Channel clientChannel, serverChannel;
34+
String queue = "rpc.queue";
35+
RpcServer rpcServer;
36+
37+
38+
@Before public void init() throws Exception {
39+
clientConnection = TestUtils.connectionFactory().newConnection();
40+
clientChannel = clientConnection.createChannel();
41+
serverConnection = TestUtils.connectionFactory().newConnection();
42+
serverChannel = serverConnection.createChannel();
43+
serverChannel.queueDeclare(queue, false, false, false, null);
44+
}
45+
46+
@After public void tearDown() throws Exception {
47+
if(rpcServer != null) {
48+
rpcServer.terminateMainloop();
49+
}
50+
if(serverChannel != null) {
51+
serverChannel.queueDelete(queue);
52+
}
53+
TestUtils.close(clientConnection);
54+
TestUtils.close(serverConnection);
55+
}
56+
57+
@Test
58+
public void rpc() throws Exception {
59+
rpcServer = new TestRpcServer(serverChannel, queue);
60+
new Thread(() -> {
61+
try {
62+
rpcServer.mainloop();
63+
} catch (Exception e) {
64+
// safe to ignore when loops ends/server is canceled
65+
}
66+
}).start();
67+
68+
RpcClient client = new RpcClient(clientChannel, "", queue, 1000);
69+
byte[] response = client.primitiveCall("hello".getBytes());
70+
assertEquals("*** hello ***", new String(response));
71+
client.close();
72+
}
73+
74+
75+
76+
private static class TestRpcServer extends RpcServer {
77+
78+
public TestRpcServer(Channel channel, String queueName) throws IOException {
79+
super(channel, queueName);
80+
}
81+
82+
@Override
83+
public byte[] handleCall(Delivery request, AMQP.BasicProperties replyProperties) {
84+
String input = new String(request.getBody());
85+
return ("*** " + input + " ***").getBytes();
86+
}
87+
}
88+
89+
}

src/test/java/com/rabbitmq/client/test/TestUtils.java

+13
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@
1515

1616
package com.rabbitmq.client.test;
1717

18+
import com.rabbitmq.client.Connection;
1819
import com.rabbitmq.client.ConnectionFactory;
1920

21+
import java.io.IOException;
22+
2023
public class TestUtils {
2124

2225
public static final boolean USE_NIO = System.getProperty("use.nio") == null ? false : true;
@@ -31,4 +34,14 @@ public static ConnectionFactory connectionFactory() {
3134
return connectionFactory;
3235
}
3336

37+
public static void close(Connection connection) {
38+
if(connection != null) {
39+
try {
40+
connection.close();
41+
} catch (IOException e) {
42+
throw new RuntimeException(e);
43+
}
44+
}
45+
}
46+
3447
}

0 commit comments

Comments
 (0)