diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchLoggingErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchLoggingErrorHandler.java index 55c7895d65..4271a266a7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchLoggingErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchLoggingErrorHandler.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.springframework.core.log.LogAccessor; +import org.springframework.lang.Nullable; /** * Simple handler that invokes a {@link LoggingErrorHandler} for each record. @@ -35,7 +36,7 @@ public class BatchLoggingErrorHandler implements BatchErrorHandler { new LogAccessor(LogFactory.getLog(BatchLoggingErrorHandler.class)); @Override - public void handle(Exception thrownException, ConsumerRecords data) { + public void handle(Exception thrownException, @Nullable ConsumerRecords data) { StringBuilder message = new StringBuilder("Error while processing:\n"); if (data == null) { message.append("null "); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareBatchErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareBatchErrorHandler.java index 6325529865..6936f7e584 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareBatchErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareBatchErrorHandler.java @@ -19,6 +19,8 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.springframework.lang.Nullable; + /** * An error handler that has access to the consumer, for example to adjust * offsets after an error. @@ -31,7 +33,7 @@ public interface ConsumerAwareBatchErrorHandler extends BatchErrorHandler { @Override - default void handle(Exception thrownException, ConsumerRecords data) { + default void handle(Exception thrownException, @Nullable ConsumerRecords data) { throw new UnsupportedOperationException("Container should never call this"); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareErrorHandler.java index f4e478eac7..54db386b61 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareErrorHandler.java @@ -21,6 +21,8 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.lang.Nullable; + /** * An error handler that has access to the consumer, for example to adjust * offsets after an error. @@ -33,17 +35,17 @@ public interface ConsumerAwareErrorHandler extends ErrorHandler { @Override - default void handle(Exception thrownException, ConsumerRecord data) { + default void handle(Exception thrownException, @Nullable ConsumerRecord data) { throw new UnsupportedOperationException("Container should never call this"); } @Override - void handle(Exception thrownException, ConsumerRecord data, Consumer consumer); + void handle(Exception thrownException, @Nullable ConsumerRecord data, Consumer consumer); @Override - default void handle(Exception thrownException, List> data, Consumer consumer, + default void handle(Exception thrownException, @Nullable List> data, Consumer consumer, MessageListenerContainer container) { - handle(thrownException, null, consumer); // NOSONAR + handle(thrownException, null, consumer); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandler.java index bf8b27c302..9aa77692c1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandler.java @@ -38,7 +38,7 @@ public interface ErrorHandler extends GenericErrorHandler> */ default void handle(Exception thrownException, List> records, Consumer consumer, MessageListenerContainer container) { - handle(thrownException, null); // NOSONAR + handle(thrownException, null); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/GenericErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/GenericErrorHandler.java index 2f3c688b7f..050aa91dab 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/GenericErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/GenericErrorHandler.java @@ -18,6 +18,8 @@ import org.apache.kafka.clients.consumer.Consumer; +import org.springframework.lang.Nullable; + /** * A generic error handler. * @@ -35,7 +37,7 @@ public interface GenericErrorHandler { * @param thrownException The exception. * @param data the data. */ - void handle(Exception thrownException, T data); + void handle(Exception thrownException, @Nullable T data); /** * Handle the exception. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/LoggingErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/LoggingErrorHandler.java index 41e1878821..ddbfe11bd9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/LoggingErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/LoggingErrorHandler.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.core.log.LogAccessor; +import org.springframework.lang.Nullable; import org.springframework.util.ObjectUtils; /** @@ -33,7 +34,7 @@ public class LoggingErrorHandler implements ErrorHandler { private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(LoggingErrorHandler.class)); @Override - public void handle(Exception thrownException, ConsumerRecord record) { + public void handle(Exception thrownException, @Nullable ConsumerRecord record) { LOGGER.error(thrownException, () -> "Error while processing: " + ObjectUtils.nullSafeToString(record)); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/RemainingRecordsErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/RemainingRecordsErrorHandler.java index 529189a434..8a3e171b8a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/RemainingRecordsErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/RemainingRecordsErrorHandler.java @@ -21,6 +21,8 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.lang.Nullable; + /** * An error handler that has access to the unprocessed records from the last poll * (including the failed record) and the consumer, for example to adjust offsets after an @@ -35,7 +37,7 @@ public interface RemainingRecordsErrorHandler extends ConsumerAwareErrorHandler { @Override - default void handle(Exception thrownException, ConsumerRecord data, Consumer consumer) { + default void handle(Exception thrownException, @Nullable ConsumerRecord data, Consumer consumer) { throw new UnsupportedOperationException("Container should never call this"); }