Skip to content

Commit 11e03c9

Browse files
committed
GH-2262: Fix Possible NPE
MLC can be null. Other Sonar fixes.
1 parent f7cbbfe commit 11e03c9

File tree

4 files changed

+57
-21
lines changed

4 files changed

+57
-21
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPausingBackOffHandler.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
*/
3232
public class ContainerPausingBackOffHandler implements BackOffHandler {
3333

34+
private final DefaultBackOffHandler defaultBackOffHandler = new DefaultBackOffHandler();
35+
3436
private final ListenerContainerPauseService pauser;
3537

3638
/**
@@ -43,7 +45,12 @@ public ContainerPausingBackOffHandler(ListenerContainerPauseService pauser) {
4345

4446
@Override
4547
public void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff) {
46-
this.pauser.pause(container, Duration.ofMillis(nextBackOff));
48+
if (container == null) {
49+
this.defaultBackOffHandler.onNextBackOff(container, exception, nextBackOff);
50+
}
51+
else {
52+
this.pauser.pause(container, Duration.ofMillis(nextBackOff));
53+
}
4754
}
4855

4956
@Override
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.springframework.lang.Nullable;
20+
21+
/**
22+
* Default {@link BackOffHandler}; suspends the thread for the back off. If a container is
23+
* provided, {@link ListenerUtils#stoppableSleep(MessageListenerContainer, long)} is used,
24+
* to terminate the suspension if the container is stopped.
25+
*
26+
* @author Jan Marincek
27+
* @author Gary Russell
28+
* @since 2.9
29+
*/
30+
public class DefaultBackOffHandler implements BackOffHandler {
31+
32+
@Override
33+
public void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff) {
34+
try {
35+
if (container == null) {
36+
Thread.sleep(nextBackOff);
37+
}
38+
else {
39+
ListenerUtils.stoppableSleep(container, nextBackOff);
40+
}
41+
}
42+
catch (InterruptedException e) {
43+
Thread.currentThread().interrupt();
44+
}
45+
}
46+
47+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -308,21 +308,4 @@ void setLastException(Exception lastException) {
308308

309309
}
310310

311-
static class DefaultBackOffHandler implements BackOffHandler {
312-
@Override
313-
public void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff) {
314-
try {
315-
if (container == null) {
316-
Thread.sleep(nextBackOff);
317-
}
318-
else {
319-
ListenerUtils.stoppableSleep(container, nextBackOff);
320-
}
321-
}
322-
catch (InterruptedException e) {
323-
throw new RuntimeException(e);
324-
}
325-
}
326-
}
327-
328311
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerContainerPauseService.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.kafka.common.TopicPartition;
2626

2727
import org.springframework.core.log.LogAccessor;
28-
import org.springframework.lang.NonNull;
2928
import org.springframework.lang.Nullable;
3029
import org.springframework.scheduling.TaskScheduler;
3130
import org.springframework.util.Assert;
@@ -118,7 +117,7 @@ public void pausePartition(MessageListenerContainer messageListenerContainer, To
118117
* Resume the listener container by given id.
119118
* @param listenerId the id of the listener
120119
*/
121-
public void resume(@NonNull String listenerId) {
120+
public void resume(String listenerId) {
122121
Assert.notNull(this.registry, "Resume by id is only supported when a registry is provided");
123122
getListenerContainer(listenerId).ifPresent(this::resume);
124123
}
@@ -127,7 +126,7 @@ public void resume(@NonNull String listenerId) {
127126
* Resume the listener container.
128127
* @param messageListenerContainer the listener container
129128
*/
130-
public void resume(@NonNull MessageListenerContainer messageListenerContainer) {
129+
public void resume(MessageListenerContainer messageListenerContainer) {
131130
if (messageListenerContainer.isPauseRequested()) {
132131
LOGGER.debug(() -> "Resuming container " + messageListenerContainer);
133132
messageListenerContainer.resume();

0 commit comments

Comments
 (0)