Skip to content

Commit 28df0d0

Browse files
Merge branch '5.x.x-stable'
2 parents 83d5fc3 + b4e0a26 commit 28df0d0

16 files changed

+254
-103
lines changed

LICENSE-MPL-RabbitMQ

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,4 +364,4 @@ file, then You may include the notice in a location (such as a LICENSE
364364
file in a relevant directory) where a recipient would be likely to look
365365
for such a notice.
366366

367-
Copyright (c) 2007-2020 VMware, Inc. or its affiliates.
367+
Copyright (c) 2007-2021 VMware, Inc. or its affiliates.

RUNNING_TESTS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,4 +107,4 @@ Launch the tests:
107107
```
108108

109109
Note the `rabbitmqctl.bin` system property uses the syntax
110-
`DOCKER:{containerId}`.
110+
`DOCKER:{containerId}`.

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919
import com.rabbitmq.client.impl.nio.NioParams;
2020
import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory;
2121
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
22+
import com.rabbitmq.client.impl.recovery.RecoveredQueueNameSupplier;
2223
import com.rabbitmq.client.impl.recovery.RetryHandler;
2324
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
2427

2528
import java.util.Map.Entry;
2629
import java.util.function.BiConsumer;
@@ -49,6 +52,7 @@
4952
*/
5053
public class ConnectionFactory implements Cloneable {
5154

55+
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionFactory.class);
5256
private static final int MAX_UNSIGNED_SHORT = 65535;
5357

5458
/** Default user name */
@@ -186,6 +190,7 @@ public class ConnectionFactory implements Cloneable {
186190
* @since 5.4.0
187191
*/
188192
private RetryHandler topologyRecoveryRetryHandler;
193+
private RecoveredQueueNameSupplier recoveredQueueNameSupplier;
189194

190195
/**
191196
* Traffic listener notified of inbound and outbound {@link Command}s.
@@ -1261,6 +1266,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
12611266
result.setTopologyRecoveryFilter(topologyRecoveryFilter);
12621267
result.setConnectionRecoveryTriggeringCondition(connectionRecoveryTriggeringCondition);
12631268
result.setTopologyRecoveryRetryHandler(topologyRecoveryRetryHandler);
1269+
result.setRecoveredQueueNameSupplier(recoveredQueueNameSupplier);
12641270
result.setTrafficListener(trafficListener);
12651271
result.setCredentialsRefreshService(credentialsRefreshService);
12661272
return result;
@@ -1639,6 +1645,15 @@ public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHa
16391645
this.topologyRecoveryRetryHandler = topologyRecoveryRetryHandler;
16401646
}
16411647

1648+
/**
1649+
* Set the recovered queue name supplier. Default is use the same queue name when recovering queues.
1650+
*
1651+
* @param recoveredQueueNameSupplier queue name supplier
1652+
*/
1653+
public void setRecoveredQueueNameSupplier(RecoveredQueueNameSupplier recoveredQueueNameSupplier) {
1654+
this.recoveredQueueNameSupplier = recoveredQueueNameSupplier;
1655+
}
1656+
16421657
/**
16431658
* Traffic listener notified of inbound and outbound {@link Command}s.
16441659
* <p>

src/main/java/com/rabbitmq/client/MetricsCollector.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,21 @@ public interface MetricsCollector {
3636

3737
void basicPublish(Channel channel, long deliveryTag);
3838

39-
void basicPublishFailure(Channel channel, Throwable cause);
39+
default void basicPublishFailure(Channel channel, Throwable cause) {
4040

41-
void basicPublishAck(Channel channel, long deliveryTag, boolean multiple);
41+
}
4242

43-
void basicPublishNack(Channel channel, long deliveryTag, boolean multiple);
43+
default void basicPublishAck(Channel channel, long deliveryTag, boolean multiple) {
4444

45-
void basicPublishUnrouted(Channel channel);
45+
}
46+
47+
default void basicPublishNack(Channel channel, long deliveryTag, boolean multiple) {
48+
49+
}
50+
51+
default void basicPublishUnrouted(Channel channel) {
52+
53+
}
4654

4755
void consumedMessage(Channel channel, long deliveryTag, boolean autoAck);
4856

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -397,35 +397,35 @@ public void start()
397397
}
398398

399399
try {
400-
int channelMax =
400+
int negotiatedChannelMax =
401401
negotiateChannelMax(this.requestedChannelMax,
402402
connTune.getChannelMax());
403403

404-
if (!checkUnsignedShort(channelMax)) {
405-
throw new IllegalArgumentException("Negotiated channel max must be between 0 and " + MAX_UNSIGNED_SHORT + ": " + channelMax);
404+
if (!checkUnsignedShort(negotiatedChannelMax)) {
405+
throw new IllegalArgumentException("Negotiated channel max must be between 0 and " + MAX_UNSIGNED_SHORT + ": " + negotiatedChannelMax);
406406
}
407407

408-
_channelManager = instantiateChannelManager(channelMax, threadFactory);
408+
_channelManager = instantiateChannelManager(negotiatedChannelMax, threadFactory);
409409

410410
int frameMax =
411411
negotiatedMaxValue(this.requestedFrameMax,
412412
connTune.getFrameMax());
413413
this._frameMax = frameMax;
414414

415-
int heartbeat =
415+
int negotiatedHeartbeat =
416416
negotiatedMaxValue(this.requestedHeartbeat,
417417
connTune.getHeartbeat());
418418

419-
if (!checkUnsignedShort(heartbeat)) {
420-
throw new IllegalArgumentException("Negotiated heartbeat must be between 0 and " + MAX_UNSIGNED_SHORT + ": " + heartbeat);
419+
if (!checkUnsignedShort(negotiatedHeartbeat)) {
420+
throw new IllegalArgumentException("Negotiated heartbeat must be between 0 and " + MAX_UNSIGNED_SHORT + ": " + negotiatedHeartbeat);
421421
}
422422

423-
setHeartbeat(heartbeat);
423+
setHeartbeat(negotiatedHeartbeat);
424424

425425
_channel0.transmit(new AMQP.Connection.TuneOk.Builder()
426-
.channelMax(channelMax)
426+
.channelMax(negotiatedChannelMax)
427427
.frameMax(frameMax)
428-
.heartbeat(heartbeat)
428+
.heartbeat(negotiatedHeartbeat)
429429
.build());
430430
_channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()
431431
.virtualHost(_virtualHost)

src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,9 @@ public void basicPublishNack(Channel channel, long deliveryTag, boolean multiple
151151
@Override
152152
public void basicPublishUnrouted(Channel channel) {
153153
try {
154-
markPublishedMessageNotRouted();
154+
markPublishedMessageUnrouted();
155155
} catch(Exception e) {
156-
LOGGER.info("Error while computing metrics in markPublishedMessageNotRouted: " + e.getMessage());
156+
LOGGER.info("Error while computing metrics in markPublishedMessageUnrouted: " + e.getMessage());
157157
}
158158
}
159159

@@ -420,5 +420,5 @@ private ChannelState(Channel channel) {
420420
/**
421421
* Marks the event of a published message not being routed.
422422
*/
423-
protected abstract void markPublishedMessageNotRouted();
423+
protected abstract void markPublishedMessageUnrouted();
424424
}

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.rabbitmq.client.SaslConfig;
2222
import com.rabbitmq.client.ShutdownSignalException;
2323
import com.rabbitmq.client.TrafficListener;
24+
import com.rabbitmq.client.impl.recovery.RecoveredQueueNameSupplier;
2425
import com.rabbitmq.client.impl.recovery.RetryHandler;
2526
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
2627

@@ -54,7 +55,7 @@ public class ConnectionParams {
5455
private TopologyRecoveryFilter topologyRecoveryFilter;
5556
private Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;
5657
private RetryHandler topologyRecoveryRetryHandler;
57-
58+
private RecoveredQueueNameSupplier recoveredQueueNameSupplier;
5859
private ExceptionHandler exceptionHandler;
5960
private ThreadFactory threadFactory;
6061

@@ -272,6 +273,14 @@ public RetryHandler getTopologyRecoveryRetryHandler() {
272273
return topologyRecoveryRetryHandler;
273274
}
274275

276+
public void setRecoveredQueueNameSupplier(RecoveredQueueNameSupplier recoveredQueueNameSupplier) {
277+
this.recoveredQueueNameSupplier = recoveredQueueNameSupplier;
278+
}
279+
280+
public RecoveredQueueNameSupplier getRecoveredQueueNameSupplier() {
281+
return recoveredQueueNameSupplier;
282+
}
283+
275284
public void setTrafficListener(TrafficListener trafficListener) {
276285
this.trafficListener = trafficListener;
277286
}

src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ protected void markMessagePublishNotAcknowledged() {
148148
}
149149

150150
@Override
151-
protected void markPublishedMessageNotRouted() {
151+
protected void markPublishedMessageUnrouted() {
152152
unroutedPublishedMessages.increment();
153153
}
154154

src/main/java/com/rabbitmq/client/impl/OAuth2ClientCredentialsGrantCredentialsProvider.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -595,5 +595,4 @@ private SSLSocketFactory sslSocketFactory() {
595595
}
596596

597597
}
598-
599598
}

src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ protected void markMessagePublishNotAcknowledged() {
125125
}
126126

127127
@Override
128-
protected void markPublishedMessageNotRouted() {
128+
protected void markPublishedMessageUnrouted() {
129129
publishUnroutedMessages.mark();
130130
}
131131

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,8 @@ public void queueDeclareNoWait(String queue,
355355
durable(durable).
356356
exclusive(exclusive).
357357
autoDelete(autoDelete).
358-
arguments(arguments);
358+
arguments(arguments).
359+
recoveredQueueNameSupplier(connection.getRecoveredQueueNameSupplier());
359360
delegate.queueDeclareNoWait(queue, durable, exclusive, autoDelete, arguments);
360361
recordQueue(queue, meta);
361362

@@ -848,7 +849,8 @@ private void recordQueue(AMQP.Queue.DeclareOk ok, String queue, boolean durable,
848849
durable(durable).
849850
exclusive(exclusive).
850851
autoDelete(autoDelete).
851-
arguments(arguments);
852+
arguments(arguments).
853+
recoveredQueueNameSupplier(connection.getRecoveredQueueNameSupplier());
852854
if (queue.equals(RecordedQueue.EMPTY_STRING)) {
853855
q.serverNamed(true);
854856
}

0 commit comments

Comments
 (0)