Skip to content

Commit de6b708

Browse files
committed
Null-proof channel state retrieval in AbstractMetricsCollector
A channel message may have been closed by the time a metrics method is called (e.g. when cancelling a consumer the metrics collector is called after the RPC is completed), so the channel state may no longer be there. This commit null-proofs methods rely on the channel state retrieval. Fixes #1592
1 parent e8c193a commit de6b708

File tree

1 file changed

+19
-4
lines changed

1 file changed

+19
-4
lines changed

src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,12 @@ public void basicPublish(Channel channel, long deliveryTag) {
115115
try {
116116
if (deliveryTag != 0) {
117117
ChannelState channelState = channelState(channel);
118+
if (channelState == null) {
119+
return;
120+
}
118121
channelState.lock.lock();
119122
try {
120-
channelState(channel).unconfirmedMessageDeliveryTags.add(deliveryTag);
123+
channelState.unconfirmedMessageDeliveryTags.add(deliveryTag);
121124
} finally {
122125
channelState.lock.unlock();
123126
}
@@ -169,9 +172,12 @@ public void basicConsume(Channel channel, String consumerTag, boolean autoAck) {
169172
try {
170173
if(!autoAck) {
171174
ChannelState channelState = channelState(channel);
175+
if (channelState == null) {
176+
return;
177+
}
172178
channelState.lock.lock();
173179
try {
174-
channelState(channel).consumersWithManualAck.add(consumerTag);
180+
channelState.consumersWithManualAck.add(consumerTag);
175181
} finally {
176182
channelState.lock.unlock();
177183
}
@@ -185,9 +191,12 @@ public void basicConsume(Channel channel, String consumerTag, boolean autoAck) {
185191
public void basicCancel(Channel channel, String consumerTag) {
186192
try {
187193
ChannelState channelState = channelState(channel);
194+
if (channelState == null) {
195+
return;
196+
}
188197
channelState.lock.lock();
189198
try {
190-
channelState(channel).consumersWithManualAck.remove(consumerTag);
199+
channelState.consumersWithManualAck.remove(consumerTag);
191200
} finally {
192201
channelState.lock.unlock();
193202
}
@@ -202,9 +211,12 @@ public void consumedMessage(Channel channel, long deliveryTag, boolean autoAck)
202211
markConsumedMessage();
203212
if(!autoAck) {
204213
ChannelState channelState = channelState(channel);
214+
if (channelState == null) {
215+
return;
216+
}
205217
channelState.lock.lock();
206218
try {
207-
channelState(channel).unackedMessageDeliveryTags.add(deliveryTag);
219+
channelState.unackedMessageDeliveryTags.add(deliveryTag);
208220
} finally {
209221
channelState.lock.unlock();
210222
}
@@ -219,6 +231,9 @@ public void consumedMessage(Channel channel, long deliveryTag, String consumerTa
219231
try {
220232
markConsumedMessage();
221233
ChannelState channelState = channelState(channel);
234+
if (channelState == null) {
235+
return;
236+
}
222237
channelState.lock.lock();
223238
try {
224239
if(channelState.consumersWithManualAck.contains(consumerTag)) {

0 commit comments

Comments
 (0)