Skip to content

Commit b47f17c

Browse files
committed
Record recoverable entities in RPC channel methods
[#162201529] Fixes #425 (cherry picked from commit f560108)
1 parent 7168133 commit b47f17c

File tree

3 files changed

+353
-19
lines changed

3 files changed

+353
-19
lines changed

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 109 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
package com.rabbitmq.client.impl.recovery;
1717

1818
import com.rabbitmq.client.*;
19-
import com.rabbitmq.client.RecoverableChannel;
19+
import com.rabbitmq.client.impl.AMQCommand;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
2022

2123
import java.io.IOException;
2224
import java.util.*;
@@ -31,6 +33,9 @@
3133
* @since 3.3.0
3234
*/
3335
public class AutorecoveringChannel implements RecoverableChannel {
36+
37+
private static final Logger LOGGER = LoggerFactory.getLogger(AutorecoveringChannel.class);
38+
3439
private volatile RecoveryAwareChannelN delegate;
3540
private volatile AutorecoveringConnection connection;
3641
private final List<ShutdownListener> shutdownHooks = new CopyOnWriteArrayList<ShutdownListener>();
@@ -235,12 +240,7 @@ public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeT
235240
@Override
236241
public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException {
237242
final AMQP.Exchange.DeclareOk ok = delegate.exchangeDeclare(exchange, type, durable, autoDelete, internal, arguments);
238-
RecordedExchange x = new RecordedExchange(this, exchange).
239-
type(type).
240-
durable(durable).
241-
autoDelete(autoDelete).
242-
arguments(arguments);
243-
recordExchange(exchange, x);
243+
recordExchange(ok, exchange, type, durable, autoDelete, arguments);
244244
return ok;
245245
}
246246

@@ -331,15 +331,7 @@ public AMQP.Queue.DeclareOk queueDeclare() throws IOException {
331331
@Override
332332
public AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {
333333
final AMQP.Queue.DeclareOk ok = delegate.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
334-
RecordedQueue q = new RecordedQueue(this, ok.getQueue()).
335-
durable(durable).
336-
exclusive(exclusive).
337-
autoDelete(autoDelete).
338-
arguments(arguments);
339-
if (queue.equals(RecordedQueue.EMPTY_STRING)) {
340-
q.serverNamed(true);
341-
}
342-
recordQueue(ok, q);
334+
recordQueue(ok, queue, durable, exclusive, autoDelete, arguments);
343335
return ok;
344336
}
345337

@@ -714,7 +706,10 @@ public void asyncRpc(Method method) throws IOException {
714706

715707
@Override
716708
public Command rpc(Method method) throws IOException {
717-
return delegate.rpc(method);
709+
recordOnRpcRequest(method);
710+
AMQCommand response = delegate.rpc(method);
711+
recordOnRpcResponse(response.getMethod(), method);
712+
return response;
718713
}
719714

720715
/**
@@ -840,6 +835,18 @@ private boolean deleteRecordedExchangeBinding(String destination, String source,
840835
return this.connection.deleteRecordedExchangeBinding(this, destination, source, routingKey, arguments);
841836
}
842837

838+
private void recordQueue(AMQP.Queue.DeclareOk ok, String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) {
839+
RecordedQueue q = new RecordedQueue(this, ok.getQueue()).
840+
durable(durable).
841+
exclusive(exclusive).
842+
autoDelete(autoDelete).
843+
arguments(arguments);
844+
if (queue.equals(RecordedQueue.EMPTY_STRING)) {
845+
q.serverNamed(true);
846+
}
847+
recordQueue(ok, q);
848+
}
849+
843850
private void recordQueue(AMQP.Queue.DeclareOk ok, RecordedQueue q) {
844851
this.connection.recordQueue(ok, q);
845852
}
@@ -852,6 +859,15 @@ private void deleteRecordedQueue(String queue) {
852859
this.connection.deleteRecordedQueue(queue);
853860
}
854861

862+
private void recordExchange(AMQP.Exchange.DeclareOk ok, String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
863+
RecordedExchange x = new RecordedExchange(this, exchange).
864+
type(type).
865+
durable(durable).
866+
autoDelete(autoDelete).
867+
arguments(arguments);
868+
recordExchange(exchange, x);
869+
}
870+
855871
private void recordExchange(String exchange, RecordedExchange x) {
856872
this.connection.recordExchange(exchange, x);
857873
}
@@ -898,7 +914,82 @@ void updateConsumerTag(String tag, String newTag) {
898914

899915
@Override
900916
public CompletableFuture<Command> asyncCompletableRpc(Method method) throws IOException {
901-
return this.delegate.asyncCompletableRpc(method);
917+
recordOnRpcRequest(method);
918+
CompletableFuture<Command> future = this.delegate.asyncCompletableRpc(method);
919+
future.thenAccept(command -> {
920+
if (command != null) {
921+
recordOnRpcResponse(command.getMethod(), method);
922+
}
923+
});
924+
return future;
925+
}
926+
927+
private void recordOnRpcRequest(Method method) {
928+
if (method instanceof AMQP.Queue.Delete) {
929+
deleteRecordedQueue(((AMQP.Queue.Delete) method).getQueue());
930+
} else if (method instanceof AMQP.Exchange.Delete) {
931+
deleteRecordedExchange(((AMQP.Exchange.Delete) method).getExchange());
932+
} else if (method instanceof AMQP.Queue.Unbind) {
933+
AMQP.Queue.Unbind unbind = (AMQP.Queue.Unbind) method;
934+
deleteRecordedQueueBinding(
935+
unbind.getQueue(), unbind.getExchange(),
936+
unbind.getRoutingKey(), unbind.getArguments()
937+
);
938+
this.maybeDeleteRecordedAutoDeleteExchange(unbind.getExchange());
939+
} else if (method instanceof AMQP.Exchange.Unbind) {
940+
AMQP.Exchange.Unbind unbind = (AMQP.Exchange.Unbind) method;
941+
deleteRecordedExchangeBinding(
942+
unbind.getDestination(), unbind.getSource(),
943+
unbind.getRoutingKey(), unbind.getArguments()
944+
);
945+
this.maybeDeleteRecordedAutoDeleteExchange(unbind.getSource());
946+
}
947+
}
948+
949+
private void recordOnRpcResponse(Method response, Method request) {
950+
if (response instanceof AMQP.Queue.DeclareOk) {
951+
if (request instanceof AMQP.Queue.Declare) {
952+
AMQP.Queue.DeclareOk ok = (AMQP.Queue.DeclareOk) response;
953+
AMQP.Queue.Declare declare = (AMQP.Queue.Declare) request;
954+
recordQueue(
955+
ok, declare.getQueue(),
956+
declare.getDurable(), declare.getExclusive(), declare.getAutoDelete(),
957+
declare.getArguments()
958+
);
959+
} else {
960+
LOGGER.warn("RPC response {} and RPC request {} not compatible, topology not recorded.",
961+
response.getClass(), request.getClass());
962+
}
963+
} else if (response instanceof AMQP.Exchange.DeclareOk) {
964+
if (request instanceof AMQP.Exchange.Declare) {
965+
AMQP.Exchange.DeclareOk ok = (AMQP.Exchange.DeclareOk) response;
966+
AMQP.Exchange.Declare declare = (AMQP.Exchange.Declare) request;
967+
recordExchange(
968+
ok, declare.getExchange(), declare.getType(),
969+
declare.getDurable(), declare.getAutoDelete(),
970+
declare.getArguments()
971+
);
972+
} else {
973+
LOGGER.warn("RPC response {} and RPC request {} not compatible, topology not recorded.",
974+
response.getClass(), request.getClass());
975+
}
976+
} else if (response instanceof AMQP.Queue.BindOk) {
977+
if (request instanceof AMQP.Queue.Bind) {
978+
AMQP.Queue.Bind bind = (AMQP.Queue.Bind) request;
979+
recordQueueBinding(bind.getQueue(), bind.getExchange(), bind.getRoutingKey(), bind.getArguments());
980+
} else {
981+
LOGGER.warn("RPC response {} and RPC request {} not compatible, topology not recorded.",
982+
response.getClass(), request.getClass());
983+
}
984+
} else if (response instanceof AMQP.Exchange.BindOk) {
985+
if (request instanceof AMQP.Exchange.Bind) {
986+
AMQP.Exchange.Bind bind = (AMQP.Exchange.Bind) request;
987+
recordExchangeBinding(bind.getDestination(), bind.getSource(), bind.getRoutingKey(), bind.getArguments());
988+
} else {
989+
LOGGER.warn("RPC response {} and RPC request {} not compatible, topology not recorded.",
990+
response.getClass(), request.getClass());
991+
}
992+
}
902993
}
903994

904995
@Override

src/test/java/com/rabbitmq/client/test/ClientTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@
6767
AddressTest.class,
6868
DefaultRetryHandlerTest.class,
6969
NioDeadlockOnConnectionClosing.class,
70-
GeneratedClassesTest.class
70+
GeneratedClassesTest.class,
71+
RpcTopologyRecordingTest.class
7172
})
7273
public class ClientTests {
7374

0 commit comments

Comments
 (0)