Skip to content

Commit 6ed65b2

Browse files
committed
Record recoverable entities in RPC channel methods
[#162201529] Fixes #425 (cherry picked from commit f560108) Conflicts: src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java (cherry picked from commit 62d0584)
1 parent 68c96a1 commit 6ed65b2

File tree

5 files changed

+387
-20
lines changed

5 files changed

+387
-20
lines changed

pom.xml

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,15 @@
7575
<groovy.all.version>2.4.8</groovy.all.version>
7676
<keytool.maven.plugin.version>1.5</keytool.maven.plugin.version>
7777
<build.helper.maven-plugin.version>1.12</build.helper.maven-plugin.version>
78-
<maven.compiler.plugin.version>3.5.1</maven.compiler.plugin.version>
78+
<maven.compiler.plugin.version>3.8.0</maven.compiler.plugin.version>
7979
<maven.surefire.plugin.version>2.19.1</maven.surefire.plugin.version>
8080
<maven.failsafe.plugin.version>2.19.1</maven.failsafe.plugin.version>
8181
<maven.gpg.plugin.version>1.6</maven.gpg.plugin.version>
8282
<maven.jar.plugin.version>3.0.2</maven.jar.plugin.version>
8383
<maven.bundle.plugin.version>2.3.7</maven.bundle.plugin.version>
8484
<maven.packagecloud.wagon.version>0.0.6</maven.packagecloud.wagon.version>
8585
<checksum.maven.plugin.version>1.8</checksum.maven.plugin.version>
86+
<animal-sniffer-maven-plugin.version>1.16</animal-sniffer-maven-plugin.version>
8687

8788
<!--
8889
These groovy scripts are used later in this POM file to generate
@@ -872,13 +873,51 @@
872873
<configuration>
873874
<source>1.6</source>
874875
<target>1.6</target>
876+
<testSource>1.8</testSource>
877+
<testTarget>1.8</testTarget>
875878
<compilerArgs>
876879
<arg>-Xlint:deprecation</arg>
877880
<arg>-Xlint:unchecked</arg>
878881
</compilerArgs>
879882
</configuration>
880883
</plugin>
881884

885+
<plugin>
886+
<groupId>org.codehaus.mojo</groupId>
887+
<artifactId>animal-sniffer-maven-plugin</artifactId>
888+
<version>${animal-sniffer-maven-plugin.version}</version>
889+
<executions>
890+
<execution>
891+
<id>signature-check-in-verify</id>
892+
<phase>verify</phase>
893+
<goals>
894+
<goal>check</goal>
895+
</goals>
896+
</execution>
897+
<execution>
898+
<id>signature-check-before-packaging</id>
899+
<phase>package</phase>
900+
<goals>
901+
<goal>check</goal>
902+
</goals>
903+
</execution>
904+
</executions>
905+
<configuration>
906+
<ignores>
907+
<!--
908+
To avoid failure because of call to SSLParameters#setEndpointIdentificationAlgorithm (Java 7)
909+
It is mentioned in the Javadoc that the class making this call requires Java 7.
910+
-->
911+
<ignore>javax.net.ssl.SSLParameters</ignore>
912+
</ignores>
913+
<signature>
914+
<groupId>org.codehaus.mojo.signature</groupId>
915+
<artifactId>java16</artifactId>
916+
<version>1.0</version>
917+
</signature>
918+
</configuration>
919+
</plugin>
920+
882921
<!--
883922
Disable the unit tests plugin because we only have integration
884923
tests.

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 101 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
package com.rabbitmq.client.impl.recovery;
1717

1818
import com.rabbitmq.client.*;
19-
import com.rabbitmq.client.RecoverableChannel;
19+
import com.rabbitmq.client.impl.AMQCommand;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
2022

2123
import java.io.IOException;
2224
import java.util.*;
@@ -30,6 +32,9 @@
3032
* @since 3.3.0
3133
*/
3234
public class AutorecoveringChannel implements RecoverableChannel {
35+
36+
private static final Logger LOGGER = LoggerFactory.getLogger(AutorecoveringChannel.class);
37+
3338
private volatile RecoveryAwareChannelN delegate;
3439
private volatile AutorecoveringConnection connection;
3540
private final List<ShutdownListener> shutdownHooks = new CopyOnWriteArrayList<ShutdownListener>();
@@ -235,12 +240,7 @@ public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeT
235240
@Override
236241
public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException {
237242
final AMQP.Exchange.DeclareOk ok = delegate.exchangeDeclare(exchange, type, durable, autoDelete, internal, arguments);
238-
RecordedExchange x = new RecordedExchange(this, exchange).
239-
type(type).
240-
durable(durable).
241-
autoDelete(autoDelete).
242-
arguments(arguments);
243-
recordExchange(exchange, x);
243+
recordExchange(ok, exchange, type, durable, autoDelete, arguments);
244244
return ok;
245245
}
246246

@@ -331,15 +331,7 @@ public AMQP.Queue.DeclareOk queueDeclare() throws IOException {
331331
@Override
332332
public AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {
333333
final AMQP.Queue.DeclareOk ok = delegate.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
334-
RecordedQueue q = new RecordedQueue(this, ok.getQueue()).
335-
durable(durable).
336-
exclusive(exclusive).
337-
autoDelete(autoDelete).
338-
arguments(arguments);
339-
if (queue.equals(RecordedQueue.EMPTY_STRING)) {
340-
q.serverNamed(true);
341-
}
342-
recordQueue(ok, q);
334+
recordQueue(ok, queue, durable, exclusive, autoDelete, arguments);
343335
return ok;
344336
}
345337

@@ -356,7 +348,6 @@ public void queueDeclareNoWait(String queue,
356348
arguments(arguments);
357349
delegate.queueDeclareNoWait(queue, durable, exclusive, autoDelete, arguments);
358350
recordQueue(queue, meta);
359-
360351
}
361352

362353
@Override
@@ -546,7 +537,10 @@ public void asyncRpc(Method method) throws IOException {
546537

547538
@Override
548539
public Command rpc(Method method) throws IOException {
549-
return delegate.rpc(method);
540+
recordOnRpcRequest(method);
541+
AMQCommand response = delegate.rpc(method);
542+
recordOnRpcResponse(response.getMethod(), method);
543+
return response;
550544
}
551545

552546
/**
@@ -680,6 +674,18 @@ private boolean deleteRecordedExchangeBinding(String destination, String source,
680674
return this.connection.deleteRecordedExchangeBinding(this, destination, source, routingKey, arguments);
681675
}
682676

677+
private void recordQueue(AMQP.Queue.DeclareOk ok, String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) {
678+
RecordedQueue q = new RecordedQueue(this, ok.getQueue()).
679+
durable(durable).
680+
exclusive(exclusive).
681+
autoDelete(autoDelete).
682+
arguments(arguments);
683+
if (queue.equals(RecordedQueue.EMPTY_STRING)) {
684+
q.serverNamed(true);
685+
}
686+
recordQueue(ok, q);
687+
}
688+
683689
private void recordQueue(AMQP.Queue.DeclareOk ok, RecordedQueue q) {
684690
this.connection.recordQueue(ok, q);
685691
}
@@ -692,6 +698,15 @@ private void deleteRecordedQueue(String queue) {
692698
this.connection.deleteRecordedQueue(queue);
693699
}
694700

701+
private void recordExchange(AMQP.Exchange.DeclareOk ok, String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
702+
RecordedExchange x = new RecordedExchange(this, exchange).
703+
type(type).
704+
durable(durable).
705+
autoDelete(autoDelete).
706+
arguments(arguments);
707+
recordExchange(exchange, x);
708+
}
709+
695710
private void recordExchange(String exchange, RecordedExchange x) {
696711
this.connection.recordExchange(exchange, x);
697712
}
@@ -736,6 +751,74 @@ void updateConsumerTag(String tag, String newTag) {
736751
}
737752
}
738753

754+
private void recordOnRpcRequest(Method method) {
755+
if (method instanceof AMQP.Queue.Delete) {
756+
deleteRecordedQueue(((AMQP.Queue.Delete) method).getQueue());
757+
} else if (method instanceof AMQP.Exchange.Delete) {
758+
deleteRecordedExchange(((AMQP.Exchange.Delete) method).getExchange());
759+
} else if (method instanceof AMQP.Queue.Unbind) {
760+
AMQP.Queue.Unbind unbind = (AMQP.Queue.Unbind) method;
761+
deleteRecordedQueueBinding(
762+
unbind.getQueue(), unbind.getExchange(),
763+
unbind.getRoutingKey(), unbind.getArguments()
764+
);
765+
this.maybeDeleteRecordedAutoDeleteExchange(unbind.getExchange());
766+
} else if (method instanceof AMQP.Exchange.Unbind) {
767+
AMQP.Exchange.Unbind unbind = (AMQP.Exchange.Unbind) method;
768+
deleteRecordedExchangeBinding(
769+
unbind.getDestination(), unbind.getSource(),
770+
unbind.getRoutingKey(), unbind.getArguments()
771+
);
772+
this.maybeDeleteRecordedAutoDeleteExchange(unbind.getSource());
773+
}
774+
}
775+
776+
private void recordOnRpcResponse(Method response, Method request) {
777+
if (response instanceof AMQP.Queue.DeclareOk) {
778+
if (request instanceof AMQP.Queue.Declare) {
779+
AMQP.Queue.DeclareOk ok = (AMQP.Queue.DeclareOk) response;
780+
AMQP.Queue.Declare declare = (AMQP.Queue.Declare) request;
781+
recordQueue(
782+
ok, declare.getQueue(),
783+
declare.getDurable(), declare.getExclusive(), declare.getAutoDelete(),
784+
declare.getArguments()
785+
);
786+
} else {
787+
LOGGER.warn("RPC response {} and RPC request {} not compatible, topology not recorded.",
788+
response.getClass(), request.getClass());
789+
}
790+
} else if (response instanceof AMQP.Exchange.DeclareOk) {
791+
if (request instanceof AMQP.Exchange.Declare) {
792+
AMQP.Exchange.DeclareOk ok = (AMQP.Exchange.DeclareOk) response;
793+
AMQP.Exchange.Declare declare = (AMQP.Exchange.Declare) request;
794+
recordExchange(
795+
ok, declare.getExchange(), declare.getType(),
796+
declare.getDurable(), declare.getAutoDelete(),
797+
declare.getArguments()
798+
);
799+
} else {
800+
LOGGER.warn("RPC response {} and RPC request {} not compatible, topology not recorded.",
801+
response.getClass(), request.getClass());
802+
}
803+
} else if (response instanceof AMQP.Queue.BindOk) {
804+
if (request instanceof AMQP.Queue.Bind) {
805+
AMQP.Queue.Bind bind = (AMQP.Queue.Bind) request;
806+
recordQueueBinding(bind.getQueue(), bind.getExchange(), bind.getRoutingKey(), bind.getArguments());
807+
} else {
808+
LOGGER.warn("RPC response {} and RPC request {} not compatible, topology not recorded.",
809+
response.getClass(), request.getClass());
810+
}
811+
} else if (response instanceof AMQP.Exchange.BindOk) {
812+
if (request instanceof AMQP.Exchange.Bind) {
813+
AMQP.Exchange.Bind bind = (AMQP.Exchange.Bind) request;
814+
recordExchangeBinding(bind.getDestination(), bind.getSource(), bind.getRoutingKey(), bind.getArguments());
815+
} else {
816+
LOGGER.warn("RPC response {} and RPC request {} not compatible, topology not recorded.",
817+
response.getClass(), request.getClass());
818+
}
819+
}
820+
}
821+
739822
@Override
740823
public String toString() {
741824
return this.delegate.toString();

src/test/java/com/rabbitmq/client/test/ClientTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@
6363
AddressTest.class,
6464
DefaultRetryHandlerTest.class,
6565
NioDeadlockOnConnectionClosing.class,
66-
GeneratedClassesTest.class
66+
GeneratedClassesTest.class,
67+
RpcTopologyRecordingTest.class
6768
})
6869
public class ClientTests {
6970

0 commit comments

Comments
 (0)