Skip to content

Commit a657371

Browse files
authored
Data in handle method is @nullable (#1837)
* Data in handle method is @nullable * Data in handle method is @nullable * Proper nullability in ConsumerAwareErrorHandler
1 parent e624891 commit a657371

File tree

7 files changed

+20
-10
lines changed

7 files changed

+20
-10
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/BatchLoggingErrorHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.kafka.clients.consumer.ConsumerRecords;
2323

2424
import org.springframework.core.log.LogAccessor;
25+
import org.springframework.lang.Nullable;
2526

2627
/**
2728
* Simple handler that invokes a {@link LoggingErrorHandler} for each record.
@@ -35,7 +36,7 @@ public class BatchLoggingErrorHandler implements BatchErrorHandler {
3536
new LogAccessor(LogFactory.getLog(BatchLoggingErrorHandler.class));
3637

3738
@Override
38-
public void handle(Exception thrownException, ConsumerRecords<?, ?> data) {
39+
public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data) {
3940
StringBuilder message = new StringBuilder("Error while processing:\n");
4041
if (data == null) {
4142
message.append("null ");

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareBatchErrorHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import org.apache.kafka.clients.consumer.Consumer;
2020
import org.apache.kafka.clients.consumer.ConsumerRecords;
2121

22+
import org.springframework.lang.Nullable;
23+
2224
/**
2325
* An error handler that has access to the consumer, for example to adjust
2426
* offsets after an error.
@@ -31,7 +33,7 @@
3133
public interface ConsumerAwareBatchErrorHandler extends BatchErrorHandler {
3234

3335
@Override
34-
default void handle(Exception thrownException, ConsumerRecords<?, ?> data) {
36+
default void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data) {
3537
throw new UnsupportedOperationException("Container should never call this");
3638
}
3739

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareErrorHandler.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.kafka.clients.consumer.Consumer;
2222
import org.apache.kafka.clients.consumer.ConsumerRecord;
2323

24+
import org.springframework.lang.Nullable;
25+
2426
/**
2527
* An error handler that has access to the consumer, for example to adjust
2628
* offsets after an error.
@@ -33,17 +35,17 @@
3335
public interface ConsumerAwareErrorHandler extends ErrorHandler {
3436

3537
@Override
36-
default void handle(Exception thrownException, ConsumerRecord<?, ?> data) {
38+
default void handle(Exception thrownException, @Nullable ConsumerRecord<?, ?> data) {
3739
throw new UnsupportedOperationException("Container should never call this");
3840
}
3941

4042
@Override
41-
void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer);
43+
void handle(Exception thrownException, @Nullable ConsumerRecord<?, ?> data, Consumer<?, ?> consumer);
4244

4345
@Override
44-
default void handle(Exception thrownException, List<ConsumerRecord<?, ?>> data, Consumer<?, ?> consumer,
46+
default void handle(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> data, Consumer<?, ?> consumer,
4547
MessageListenerContainer container) {
46-
handle(thrownException, null, consumer); // NOSONAR
48+
handle(thrownException, null, consumer);
4749
}
4850

4951
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public interface ErrorHandler extends GenericErrorHandler<ConsumerRecord<?, ?>>
3838
*/
3939
default void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
4040
MessageListenerContainer container) {
41-
handle(thrownException, null); // NOSONAR
41+
handle(thrownException, null);
4242
}
4343

4444
}

spring-kafka/src/main/java/org/springframework/kafka/listener/GenericErrorHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import org.apache.kafka.clients.consumer.Consumer;
2020

21+
import org.springframework.lang.Nullable;
22+
2123
/**
2224
* A generic error handler.
2325
*
@@ -35,7 +37,7 @@ public interface GenericErrorHandler<T> {
3537
* @param thrownException The exception.
3638
* @param data the data.
3739
*/
38-
void handle(Exception thrownException, T data);
40+
void handle(Exception thrownException, @Nullable T data);
3941

4042
/**
4143
* Handle the exception.

spring-kafka/src/main/java/org/springframework/kafka/listener/LoggingErrorHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.kafka.clients.consumer.ConsumerRecord;
2121

2222
import org.springframework.core.log.LogAccessor;
23+
import org.springframework.lang.Nullable;
2324
import org.springframework.util.ObjectUtils;
2425

2526
/**
@@ -33,7 +34,7 @@ public class LoggingErrorHandler implements ErrorHandler {
3334
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(LoggingErrorHandler.class));
3435

3536
@Override
36-
public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
37+
public void handle(Exception thrownException, @Nullable ConsumerRecord<?, ?> record) {
3738
LOGGER.error(thrownException, () -> "Error while processing: " + ObjectUtils.nullSafeToString(record));
3839
}
3940

spring-kafka/src/main/java/org/springframework/kafka/listener/RemainingRecordsErrorHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.kafka.clients.consumer.Consumer;
2222
import org.apache.kafka.clients.consumer.ConsumerRecord;
2323

24+
import org.springframework.lang.Nullable;
25+
2426
/**
2527
* An error handler that has access to the unprocessed records from the last poll
2628
* (including the failed record) and the consumer, for example to adjust offsets after an
@@ -35,7 +37,7 @@
3537
public interface RemainingRecordsErrorHandler extends ConsumerAwareErrorHandler {
3638

3739
@Override
38-
default void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
40+
default void handle(Exception thrownException, @Nullable ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
3941
throw new UnsupportedOperationException("Container should never call this");
4042
}
4143

0 commit comments

Comments
 (0)