forked from rabbitmq/rabbitmq-java-client
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRpcClient.java
511 lines (461 loc) · 18.9 KB
/
RpcClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
package com.rabbitmq.client;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import com.rabbitmq.client.impl.MethodArgumentReader;
import com.rabbitmq.client.impl.MethodArgumentWriter;
import com.rabbitmq.client.impl.ValueReader;
import com.rabbitmq.client.impl.ValueWriter;
import com.rabbitmq.utility.BlockingCell;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Convenience class which manages simple RPC-style communication.
* The class is agnostic about the format of RPC arguments / return values.
* It simply provides a mechanism for sending a message to an exchange with a given routing key,
* and waiting for a response.
*/
public class RpcClient implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class);
/** Channel we are communicating on */
private final Channel _channel;
/** Exchange to send requests to */
private final String _exchange;
/** Routing key to use for requests */
private final String _routingKey;
/** Queue where the server should put the reply */
private final String _replyTo;
/** timeout to use on call responses */
private final int _timeout;
/** NO_TIMEOUT value must match convention on {@link BlockingCell#uninterruptibleGet(int)} */
protected final static int NO_TIMEOUT = -1;
/** Whether to publish RPC requests with the mandatory flag or not. */
private final boolean _useMandatory;
/** closed flag */
private final AtomicBoolean closed = new AtomicBoolean(false);
public final static Function<Object, Response> DEFAULT_REPLY_HANDLER = reply -> {
if (reply instanceof ShutdownSignalException) {
ShutdownSignalException sig = (ShutdownSignalException) reply;
ShutdownSignalException wrapper =
new ShutdownSignalException(sig.isHardError(),
sig.isInitiatedByApplication(),
sig.getReason(),
sig.getReference());
wrapper.initCause(sig);
throw wrapper;
} else if (reply instanceof UnroutableRpcRequestException) {
throw (UnroutableRpcRequestException) reply;
} else {
return (Response) reply;
}
};
private final Function<Object, Response> _replyHandler;
/** Map from request correlation ID to continuation BlockingCell */
private final Map<String, BlockingCell<Object>> _continuationMap = new HashMap<String, BlockingCell<Object>>();
/**
* Generates correlation ID for each request.
*
* @since 5.9.0
*/
private final Supplier<String> _correlationIdSupplier;
private final ReturnListener _returnListener;
private String lastCorrelationId = "0";
/** Consumer attached to our reply queue */
private final DefaultConsumer _consumer;
/**
* Construct a {@link RpcClient} with the passed-in {@link RpcClientParams}.
*
* @param params
* @throws IOException
* @see RpcClientParams
* @since 5.6.0
*/
public RpcClient(RpcClientParams params) throws
IOException {
_channel = params.getChannel();
_exchange = params.getExchange();
_routingKey = params.getRoutingKey();
_replyTo = params.getReplyTo();
if (params.getTimeout() < NO_TIMEOUT) {
throw new IllegalArgumentException("Timeout argument must be NO_TIMEOUT(-1) or non-negative.");
}
_timeout = params.getTimeout();
_useMandatory = params.shouldUseMandatory();
_replyHandler = params.getReplyHandler();
_correlationIdSupplier = params.getCorrelationIdSupplier();
_consumer = setupConsumer();
if (_useMandatory) {
this._returnListener = this._channel.addReturnListener(returnMessage -> {
synchronized (_continuationMap) {
String replyId = returnMessage.getProperties().getCorrelationId();
BlockingCell<Object> blocker = _continuationMap.remove(replyId);
if (blocker == null) {
// Entry should have been removed if request timed out,
// log a warning nevertheless.
LOGGER.warn("No outstanding request for correlation ID {}", replyId);
} else {
blocker.set(new UnroutableRpcRequestException(returnMessage));
}
}
});
} else {
this._returnListener = null;
}
}
/**
* Private API - ensures the RpcClient is correctly open.
* @throws IOException if an error is encountered
*/
private void checkNotClosed() throws IOException {
if (this.closed.get()) {
throw new EOFException("RpcClient is closed");
}
}
/**
* Public API - cancels the consumer, thus deleting the temporary queue, and marks the RpcClient as closed.
* @throws IOException if an error is encountered
*/
@Override
public void close() throws IOException {
if (this.closed.compareAndSet(false, true)) {
_channel.basicCancel(_consumer.getConsumerTag());
_channel.removeReturnListener(this._returnListener);
}
}
/**
* Registers a consumer on the reply queue.
* @throws IOException if an error is encountered
* @return the newly created and registered consumer
*/
protected DefaultConsumer setupConsumer() throws IOException {
DefaultConsumer consumer = new DefaultConsumer(_channel) {
@Override
public void handleShutdownSignal(String consumerTag,
ShutdownSignalException signal) {
synchronized (_continuationMap) {
for (Entry<String, BlockingCell<Object>> entry : _continuationMap.entrySet()) {
entry.getValue().set(signal);
}
closed.set(true);
}
}
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) {
synchronized (_continuationMap) {
String replyId = properties.getCorrelationId();
BlockingCell<Object> blocker =_continuationMap.remove(replyId);
if (blocker == null) {
// Entry should have been removed if request timed out,
// log a warning nevertheless.
LOGGER.warn("No outstanding request for correlation ID {}", replyId);
} else {
blocker.set(new Response(consumerTag, envelope, properties, body));
}
}
}
};
_channel.basicConsume(_replyTo, true, consumer);
return consumer;
}
public void publish(AMQP.BasicProperties props, byte[] message)
throws IOException
{
_channel.basicPublish(_exchange, _routingKey, _useMandatory, props, message);
}
public Response doCall(AMQP.BasicProperties props, byte[] message)
throws IOException, TimeoutException {
return doCall(props, message, _timeout);
}
public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout)
throws IOException, ShutdownSignalException, TimeoutException {
checkNotClosed();
BlockingCell<Object> k = new BlockingCell<Object>();
String replyId;
synchronized (_continuationMap) {
replyId = _correlationIdSupplier.get();
lastCorrelationId = replyId;
props = ((props==null) ? new AMQP.BasicProperties.Builder() : props.builder())
.correlationId(replyId).replyTo(_replyTo).build();
_continuationMap.put(replyId, k);
}
publish(props, message);
Object reply;
try {
reply = k.uninterruptibleGet(timeout);
} catch (TimeoutException ex) {
// Avoid potential leak. This entry is no longer needed by caller.
_continuationMap.remove(replyId);
throw ex;
}
return _replyHandler.apply(reply);
}
public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
throws IOException, ShutdownSignalException, TimeoutException
{
return primitiveCall(props, message, _timeout);
}
public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message, int timeout)
throws IOException, ShutdownSignalException, TimeoutException
{
return doCall(props, message, timeout).getBody();
}
/**
* Perform a simple byte-array-based RPC roundtrip.
* @param message the byte array request message to send
* @return the byte array response received
* @throws ShutdownSignalException if the connection dies during our wait
* @throws IOException if an error is encountered
* @throws TimeoutException if a response is not received within the configured timeout
*/
public byte[] primitiveCall(byte[] message)
throws IOException, ShutdownSignalException, TimeoutException {
return primitiveCall(null, message);
}
/**
* Perform a simple byte-array-based RPC roundtrip
*
* Useful if you need to get at more than just the body of the message
*
* @param message the byte array request message to send
* @return The response object is an envelope that contains all of the data provided to the `handleDelivery` consumer
* @throws ShutdownSignalException if the connection dies during our wait
* @throws IOException if an error is encountered
* @throws TimeoutException if a response is not received within the configured timeout
*/
public Response responseCall(byte[] message) throws IOException, ShutdownSignalException, TimeoutException {
return responseCall(message, _timeout);
}
/**
* Perform a simple byte-array-based RPC roundtrip
*
* Useful if you need to get at more than just the body of the message
*
* @param message the byte array request message to send
* @param timeout milliseconds before timing out on wait for response
* @return The response object is an envelope that contains all of the data provided to the `handleDelivery` consumer
* @throws ShutdownSignalException if the connection dies during our wait
* @throws IOException if an error is encountered
* @throws TimeoutException if a response is not received within the configured timeout
*/
public Response responseCall(byte[] message, int timeout) throws IOException, ShutdownSignalException, TimeoutException {
return doCall(null, message, timeout);
}
/**
* Perform a simple string-based RPC roundtrip.
* @param message the string request message to send
* @return the string response received
* @throws ShutdownSignalException if the connection dies during our wait
* @throws IOException if an error is encountered
* @throws TimeoutException if a timeout occurs before the response is received
*/
@SuppressWarnings("unused")
public String stringCall(String message)
throws IOException, ShutdownSignalException, TimeoutException
{
byte[] request;
try {
request = message.getBytes(StringRpcServer.STRING_ENCODING);
} catch (IOException _e) {
request = message.getBytes();
}
byte[] reply = primitiveCall(request);
try {
return new String(reply, StringRpcServer.STRING_ENCODING);
} catch (IOException _e) {
return new String(reply);
}
}
/**
* Perform an AMQP wire-protocol-table based RPC roundtrip <br><br>
*
* There are some restrictions on the values appearing in the table: <br>
* they must be of type {@link String}, {@link LongString}, {@link Integer}, {@link java.math.BigDecimal}, {@link Date},
* or (recursively) a {@link Map} of the enclosing type.
*
* @param message the table to send
* @return the table received
* @throws ShutdownSignalException if the connection dies during our wait
* @throws IOException if an error is encountered
* @throws TimeoutException if a timeout occurs before a response is received
*/
public Map<String, Object> mapCall(Map<String, Object> message)
throws IOException, ShutdownSignalException, TimeoutException
{
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
MethodArgumentWriter writer = new MethodArgumentWriter(new ValueWriter(new DataOutputStream(buffer)));
writer.writeTable(message);
writer.flush();
byte[] reply = primitiveCall(buffer.toByteArray());
MethodArgumentReader reader =
new MethodArgumentReader(new ValueReader(new DataInputStream(new ByteArrayInputStream(reply))));
return reader.readTable();
}
/**
* Perform an AMQP wire-protocol-table based RPC roundtrip, first
* constructing the table from an array of alternating keys (in
* even-numbered elements, starting at zero) and values (in
* odd-numbered elements, starting at one) <br>
* Restrictions on value arguments apply as in {@link RpcClient#mapCall(Map)}.
*
* @param keyValuePairs alternating {key, value, key, value, ...} data to send
* @return the table received
* @throws ShutdownSignalException if the connection dies during our wait
* @throws IOException if an error is encountered
* @throws TimeoutException if a timeout occurs before a response is received
*/
public Map<String, Object> mapCall(Object[] keyValuePairs)
throws IOException, ShutdownSignalException, TimeoutException
{
Map<String, Object> message = new HashMap<String, Object>();
for (int i = 0; i < keyValuePairs.length; i += 2) {
message.put((String) keyValuePairs[i], keyValuePairs[i + 1]);
}
return mapCall(message);
}
/**
* Retrieve the channel.
* @return the channel to which this client is connected
*/
public Channel getChannel() {
return _channel;
}
/**
* Retrieve the exchange.
* @return the exchange to which this client is connected
*/
public String getExchange() {
return _exchange;
}
/**
* Retrieve the routing key.
* @return the routing key for messages to this client
*/
public String getRoutingKey() {
return _routingKey;
}
/**
* Retrieve the continuation map.
* @return the map of objects to blocking cells for this client
*/
public Map<String, BlockingCell<Object>> getContinuationMap() {
return _continuationMap;
}
/**
* Retrieve the last correlation id used.
* <p>
* Note as of 5.9.0, correlation IDs may not always be integers
* (by default, they are).
* This method will try to parse the last correlation ID string
* as an integer, so this may result in {@link NumberFormatException}
* if the correlation ID supplier provided by
* {@link RpcClientParams#correlationIdSupplier(Supplier)}
* does not generate appropriate IDs.
*
* @return the most recently used correlation id
* @see RpcClientParams#correlationIdSupplier(Supplier)
*/
public int getCorrelationId() {
return Integer.valueOf(this.lastCorrelationId);
}
/**
* Retrieve the consumer.
* @return an interface to the client's consumer object
*/
public Consumer getConsumer() {
return _consumer;
}
/**
* The response object is an envelope that contains all of the data provided to the `handleDelivery` consumer
*/
public static class Response {
protected String consumerTag;
protected Envelope envelope;
protected AMQP.BasicProperties properties;
protected byte[] body;
public Response() {
}
public Response(
final String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties,
final byte[] body) {
this.consumerTag = consumerTag;
this.envelope = envelope;
this.properties = properties;
this.body = body;
}
public String getConsumerTag() {
return consumerTag;
}
public Envelope getEnvelope() {
return envelope;
}
public AMQP.BasicProperties getProperties() {
return properties;
}
public byte[] getBody() {
return body;
}
}
/**
* Creates generation IDs as a sequence of integers.
*
* @return
* @see RpcClientParams#correlationIdSupplier(Supplier)
* @since 5.9.0
*/
public static Supplier<String> incrementingCorrelationIdSupplier() {
return incrementingCorrelationIdSupplier("");
}
/**
* Creates generation IDs as a sequence of integers, with the provided prefix.
*
* @param prefix
* @return
* @see RpcClientParams#correlationIdSupplier(Supplier)
* @since 5.9.0
*/
public static Supplier<String> incrementingCorrelationIdSupplier(String prefix) {
return new IncrementingCorrelationIdSupplier(prefix);
}
/**
* @since 5.9.0
*/
private static class IncrementingCorrelationIdSupplier implements Supplier<String> {
private final String prefix;
private int correlationId;
public IncrementingCorrelationIdSupplier(String prefix) {
this.prefix = prefix;
}
@Override
public String get() {
return prefix + ++correlationId;
}
}
}