forked from rabbitmq/rabbitmq-java-client
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAMQChannel.java
639 lines (566 loc) · 23.2 KB
/
AMQChannel.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
package com.rabbitmq.client.impl;
import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.Basic;
import com.rabbitmq.client.AMQP.Confirm;
import com.rabbitmq.client.AMQP.Exchange;
import com.rabbitmq.client.AMQP.Queue;
import com.rabbitmq.client.AMQP.Tx;
import com.rabbitmq.client.Method;
import com.rabbitmq.client.observation.ObservationCollector;
import com.rabbitmq.utility.BlockingValueOrException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
/**
* Base class modelling an AMQ channel. Subclasses implement
* {@link com.rabbitmq.client.Channel#close} and
* {@link #processAsync processAsync()}, and may choose to override
* {@link #processShutdownSignal processShutdownSignal()} and
* {@link #rpc rpc()}.
*
* @see ChannelN
* @see Connection
*/
public abstract class AMQChannel extends ShutdownNotifierComponent {
private static final Logger LOGGER = LoggerFactory.getLogger(AMQChannel.class);
protected static final int NO_RPC_TIMEOUT = 0;
/**
* Protected; used instead of synchronizing on the channel itself,
* so that clients can themselves use the channel to synchronize
* on.
*/
protected final ReentrantLock _channelLock = new ReentrantLock();
protected final Condition _channelLockCondition = _channelLock.newCondition();
/** The connection this channel is associated with. */
private final AMQConnection _connection;
/** This channel's channel number. */
private final int _channelNumber;
/** Command being assembled */
private AMQCommand _command;
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
private RpcWrapper _activeRpc = null;
/** Whether transmission of content-bearing methods should be blocked */
protected volatile boolean _blockContent = false;
/** Timeout for RPC calls */
protected final int _rpcTimeout;
private final boolean _checkRpcResponseType;
private final TrafficListener _trafficListener;
private final int maxInboundMessageBodySize;
private final ObservationCollector.ConnectionInfo connectionInfo;
/**
* Construct a channel on the given connection, with the given channel number.
* @param connection the underlying connection for this channel
* @param channelNumber the allocated reference number for this channel
*/
public AMQChannel(AMQConnection connection, int channelNumber) {
this._connection = connection;
this._channelNumber = channelNumber;
if(connection.getChannelRpcTimeout() < 0) {
throw new IllegalArgumentException("Continuation timeout on RPC calls cannot be less than 0");
}
this._rpcTimeout = connection.getChannelRpcTimeout();
this._checkRpcResponseType = connection.willCheckRpcResponseType();
this._trafficListener = connection.getTrafficListener();
this.maxInboundMessageBodySize = connection.getMaxInboundMessageBodySize();
this._command = new AMQCommand(this.maxInboundMessageBodySize);
this.connectionInfo = connection.connectionInfo();
}
/**
* Public API - Retrieves this channel's channel number.
* @return the channel number
*/
public int getChannelNumber() {
return _channelNumber;
}
/**
* Private API - When the Connection receives a Frame for this
* channel, it passes it to this method.
* @param frame the incoming frame
* @throws IOException if an error is encountered
*/
public void handleFrame(Frame frame) throws IOException {
AMQCommand command = _command;
if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
_command = new AMQCommand(this.maxInboundMessageBodySize); // prepare for the next one
handleCompleteInboundCommand(command);
}
}
/**
* Placeholder until we address bug 15786 (implementing a proper exception hierarchy).
* In the meantime, this at least won't throw away any information from the wrapped exception.
* @param ex the exception to wrap
* @return the wrapped exception
*/
public static IOException wrap(ShutdownSignalException ex) {
return wrap(ex, null);
}
public static IOException wrap(ShutdownSignalException ex, String message) {
IOException ioe = new IOException(message);
ioe.initCause(ex);
return ioe;
}
/**
* Placeholder until we address bug 15786 (implementing a proper exception hierarchy).
*/
public AMQCommand exnWrappingRpc(Method m)
throws IOException
{
try {
return privateRpc(m);
} catch (AlreadyClosedException ace) {
// Do not wrap it since it means that connection/channel
// was closed in some action in the past
throw ace;
} catch (ShutdownSignalException ex) {
throw wrap(ex);
}
}
public CompletableFuture<Command> exnWrappingAsyncRpc(Method m)
throws IOException
{
try {
return privateAsyncRpc(m);
} catch (AlreadyClosedException ace) {
// Do not wrap it since it means that connection/channel
// was closed in some action in the past
throw ace;
} catch (ShutdownSignalException ex) {
throw wrap(ex);
}
}
/**
* Private API - handle a command which has been assembled
* @throws IOException if there's any problem
*
* @param command the incoming command
* @throws IOException
*/
public void handleCompleteInboundCommand(AMQCommand command) throws IOException {
// First, offer the command to the asynchronous-command
// handling mechanism, which gets to act as a filter on the
// incoming command stream. If processAsync() returns true,
// the command has been dealt with by the filter and so should
// not be processed further. It will return true for
// asynchronous commands (deliveries/returns/other events),
// and false for commands that should be passed on to some
// waiting RPC continuation.
this._trafficListener.read(command);
if (!processAsync(command)) {
// The filter decided not to handle/consume the command,
// so it must be a response to an earlier RPC.
if (_checkRpcResponseType) {
_channelLock.lock();
try {
// check if this reply command is intended for the current waiting request before calling nextOutstandingRpc()
if (_activeRpc != null && !_activeRpc.canHandleReply(command)) {
// this reply command is not intended for the current waiting request
// most likely a previous request timed out and this command is the reply for that.
// Throw this reply command away so we don't stop the current request from waiting for its reply
return;
}
} finally {
_channelLock.unlock();
}
}
final RpcWrapper nextOutstandingRpc = nextOutstandingRpc();
// the outstanding RPC can be null when calling Channel#asyncRpc
if(nextOutstandingRpc != null) {
nextOutstandingRpc.complete(command);
markRpcFinished();
}
}
}
public void enqueueRpc(RpcContinuation k)
{
doEnqueueRpc(() -> new RpcContinuationRpcWrapper(k));
}
public void enqueueAsyncRpc(Method method, CompletableFuture<Command> future) {
doEnqueueRpc(() -> new CompletableFutureRpcWrapper(method, future));
}
private void doEnqueueRpc(Supplier<RpcWrapper> rpcWrapperSupplier) {
_channelLock.lock();
try {
boolean waitClearedInterruptStatus = false;
while (_activeRpc != null) {
try {
_channelLockCondition.await();
} catch (InterruptedException e) { //NOSONAR
waitClearedInterruptStatus = true;
// No Sonar: we re-interrupt the thread later
}
}
if (waitClearedInterruptStatus) {
Thread.currentThread().interrupt();
}
_activeRpc = rpcWrapperSupplier.get();
} finally {
_channelLock.unlock();
}
}
public boolean isOutstandingRpc()
{
_channelLock.lock();
try {
return (_activeRpc != null);
} finally {
_channelLock.unlock();
}
}
public RpcWrapper nextOutstandingRpc()
{
_channelLock.lock();
try {
RpcWrapper result = _activeRpc;
_activeRpc = null;
_channelLockCondition.signalAll();
return result;
} finally {
_channelLock.unlock();
}
}
protected void markRpcFinished() {
// no-op
}
public void ensureIsOpen()
throws AlreadyClosedException
{
if (!isOpen()) {
throw new AlreadyClosedException(getCloseReason());
}
}
/**
* Protected API - sends a {@link Method} to the broker and waits for the
* next in-bound Command from the broker: only for use from
* non-connection-MainLoop threads!
*/
public AMQCommand rpc(Method m)
throws IOException, ShutdownSignalException
{
return privateRpc(m);
}
public AMQCommand rpc(Method m, int timeout)
throws IOException, ShutdownSignalException, TimeoutException {
return privateRpc(m, timeout);
}
private AMQCommand privateRpc(Method m)
throws IOException, ShutdownSignalException
{
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);
rpc(m, k);
// At this point, the request method has been sent, and we
// should wait for the reply to arrive.
//
// Calling getReply() on the continuation puts us to sleep
// until the connection's reader-thread throws the reply over
// the fence or the RPC times out (if enabled)
if(_rpcTimeout == NO_RPC_TIMEOUT) {
return k.getReply();
} else {
try {
return k.getReply(_rpcTimeout);
} catch (TimeoutException e) {
throw wrapTimeoutException(m, e);
}
}
}
private void cleanRpcChannelState() {
try {
// clean RPC channel state
nextOutstandingRpc();
markRpcFinished();
} catch (Exception ex) {
LOGGER.warn("Error while cleaning timed out channel RPC: {}", ex.getMessage());
}
}
/** Cleans RPC channel state after a timeout and wraps the TimeoutException in a ChannelContinuationTimeoutException */
protected ChannelContinuationTimeoutException wrapTimeoutException(final Method m, final TimeoutException e) {
cleanRpcChannelState();
return new ChannelContinuationTimeoutException(e, this, this._channelNumber, m);
}
private CompletableFuture<Command> privateAsyncRpc(Method m)
throws IOException, ShutdownSignalException
{
CompletableFuture<Command> future = new CompletableFuture<>();
asyncRpc(m, future);
return future;
}
private AMQCommand privateRpc(Method m, int timeout)
throws IOException, ShutdownSignalException, TimeoutException {
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);
rpc(m, k);
try {
return k.getReply(timeout);
} catch (TimeoutException e) {
cleanRpcChannelState();
throw e;
}
}
public void rpc(Method m, RpcContinuation k)
throws IOException
{
_channelLock.lock();
try {
ensureIsOpen();
quiescingRpc(m, k);
} finally {
_channelLock.unlock();
}
}
public void quiescingRpc(Method m, RpcContinuation k)
throws IOException
{
_channelLock.lock();
try {
enqueueRpc(k);
quiescingTransmit(m);
} finally {
_channelLock.unlock();
}
}
public void asyncRpc(Method m, CompletableFuture<Command> future)
throws IOException
{
_channelLock.lock();
try {
ensureIsOpen();
quiescingAsyncRpc(m, future);
} finally {
_channelLock.unlock();
}
}
public void quiescingAsyncRpc(Method m, CompletableFuture<Command> future)
throws IOException
{
_channelLock.lock();
try {
enqueueAsyncRpc(m, future);
quiescingTransmit(m);
} finally {
_channelLock.unlock();
}
}
/**
* Protected API - called by nextCommand to check possibly handle an incoming Command before it is returned to the caller of nextCommand. If this method
* returns true, the command is considered handled and is not passed back to nextCommand's caller; if it returns false, nextCommand returns the command as
* usual. This is used in subclasses to implement handling of Basic.Return and Basic.Deliver messages, as well as Channel.Close and Connection.Close.
* @param command the command to handle asynchronously
* @return true if we handled the command; otherwise the caller should consider it "unhandled"
*/
public abstract boolean processAsync(Command command) throws IOException;
@Override public String toString() {
return "AMQChannel(" + _connection + "," + _channelNumber + ")";
}
/**
* Protected API - respond, in the driver thread, to a {@link ShutdownSignalException}.
* @param signal the signal to handle
* @param ignoreClosed the flag indicating whether to ignore the AlreadyClosedException
* thrown when the channel is already closed
* @param notifyRpc the flag indicating whether any remaining rpc continuation should be
* notified with the given signal
*/
public void processShutdownSignal(ShutdownSignalException signal,
boolean ignoreClosed,
boolean notifyRpc) {
try {
_channelLock.lock();
try {
if (!setShutdownCauseIfOpen(signal)) {
if (!ignoreClosed)
throw new AlreadyClosedException(getCloseReason());
}
_channelLockCondition.signalAll();
} finally {
_channelLock.unlock();
}
} finally {
if (notifyRpc)
notifyOutstandingRpc(signal);
}
}
public void notifyOutstandingRpc(ShutdownSignalException signal) {
RpcWrapper k = nextOutstandingRpc();
if (k != null) {
k.shutdown(signal);
}
}
public void transmit(Method m) throws IOException {
_channelLock.lock();
try {
transmit(new AMQCommand(m));
} finally {
_channelLock.unlock();
}
}
public void transmit(AMQCommand c) throws IOException {
_channelLock.lock();
try {
ensureIsOpen();
quiescingTransmit(c);
} finally {
_channelLock.unlock();
}
}
public void quiescingTransmit(Method m) throws IOException {
_channelLock.lock();
try {
quiescingTransmit(new AMQCommand(m));
} finally {
_channelLock.unlock();
}
}
public void quiescingTransmit(AMQCommand c) throws IOException {
_channelLock.lock();
try {
if (c.getMethod().hasContent()) {
while (_blockContent) {
try {
_channelLockCondition.await();
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
// This is to catch a situation when the thread wakes up during
// shutdown. Currently, no command that has content is allowed
// to send anything in a closing state.
ensureIsOpen();
}
}
this._trafficListener.write(c);
c.transmit(this);
} finally {
_channelLock.unlock();
}
}
public AMQConnection getConnection() {
return _connection;
}
public interface RpcContinuation {
void handleCommand(AMQCommand command);
/** @return true if the reply command can be handled for this request */
boolean canHandleReply(AMQCommand command);
void handleShutdownSignal(ShutdownSignalException signal);
}
public static abstract class BlockingRpcContinuation<T> implements RpcContinuation {
public final BlockingValueOrException<T, ShutdownSignalException> _blocker =
new BlockingValueOrException<T, ShutdownSignalException>();
protected final Method request;
public BlockingRpcContinuation() {
request = null;
}
public BlockingRpcContinuation(final Method request) {
this.request = request;
}
@Override
public void handleCommand(AMQCommand command) {
_blocker.setValue(transformReply(command));
}
@Override
public void handleShutdownSignal(ShutdownSignalException signal) {
_blocker.setException(signal);
}
public T getReply() throws ShutdownSignalException
{
return _blocker.uninterruptibleGetValue();
}
public T getReply(int timeout)
throws ShutdownSignalException, TimeoutException
{
return _blocker.uninterruptibleGetValue(timeout);
}
@Override
public boolean canHandleReply(AMQCommand command) {
return isResponseCompatibleWithRequest(request, command.getMethod());
}
public abstract T transformReply(AMQCommand command);
public static boolean isResponseCompatibleWithRequest(Method request, Method response) {
// make a best effort attempt to ensure the reply was intended for this rpc request
// Ideally each rpc request would tag an id on it that could be returned and referenced on its reply.
// But because that would be a very large undertaking to add passively this logic at least protects against ClassCastExceptions
if (request != null) {
if (request instanceof Basic.Qos) {
return response instanceof Basic.QosOk;
} else if (request instanceof Basic.Get) {
return response instanceof Basic.GetOk || response instanceof Basic.GetEmpty;
} else if (request instanceof Basic.Consume) {
if (!(response instanceof Basic.ConsumeOk))
return false;
// can also check the consumer tags match here. handle case where request consumer tag is empty and server-generated.
final String consumerTag = ((Basic.Consume) request).getConsumerTag();
return consumerTag == null || consumerTag.equals("") || consumerTag.equals(((Basic.ConsumeOk) response).getConsumerTag());
} else if (request instanceof Basic.Cancel) {
if (!(response instanceof Basic.CancelOk))
return false;
// can also check the consumer tags match here
return ((Basic.Cancel) request).getConsumerTag().equals(((Basic.CancelOk) response).getConsumerTag());
} else if (request instanceof Basic.Recover) {
return response instanceof Basic.RecoverOk;
} else if (request instanceof Exchange.Declare) {
return response instanceof Exchange.DeclareOk;
} else if (request instanceof Exchange.Delete) {
return response instanceof Exchange.DeleteOk;
} else if (request instanceof Exchange.Bind) {
return response instanceof Exchange.BindOk;
} else if (request instanceof Exchange.Unbind) {
return response instanceof Exchange.UnbindOk;
} else if (request instanceof Queue.Declare) {
// we cannot check the queue name, as the server can strip some characters
// see QueueLifecycle test and https://github.com/rabbitmq/rabbitmq-server/issues/710
return response instanceof Queue.DeclareOk;
} else if (request instanceof Queue.Delete) {
return response instanceof Queue.DeleteOk;
} else if (request instanceof Queue.Bind) {
return response instanceof Queue.BindOk;
} else if (request instanceof Queue.Unbind) {
return response instanceof Queue.UnbindOk;
} else if (request instanceof Queue.Purge) {
return response instanceof Queue.PurgeOk;
} else if (request instanceof Tx.Select) {
return response instanceof Tx.SelectOk;
} else if (request instanceof Tx.Commit) {
return response instanceof Tx.CommitOk;
} else if (request instanceof Tx.Rollback) {
return response instanceof Tx.RollbackOk;
} else if (request instanceof Confirm.Select) {
return response instanceof Confirm.SelectOk;
}
}
// for passivity default to true
return true;
}
}
public static class SimpleBlockingRpcContinuation
extends BlockingRpcContinuation<AMQCommand>
{
public SimpleBlockingRpcContinuation() {
super();
}
public SimpleBlockingRpcContinuation(final Method method) {
super(method);
}
@Override
public AMQCommand transformReply(AMQCommand command) {
return command;
}
}
protected ObservationCollector.ConnectionInfo connectionInfo() {
return this.connectionInfo;
}
}