You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Spring boot application is unable to shutdown (and timeouted in 30s) when KafkaListener in batch mode has setup infinite backoff policy. RetryingBatchErrorHandler (in combination with ErrorHandlingUtils) did not noticed that container should be shutdown and remain in endless loop also with ignoring sleep intervals (as there is a check inside ListenerUtils.stoppableSleep(container, nextBackOff);)
Issue is easily reproducable - just start simple app (with some records in consumed topic) and then try to correctly stop it:
package com.example.demokafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.stereotype.Service;
import org.springframework.util.backoff.ExponentialBackOff;
import java.util.List;
@SpringBootApplication
@EnableKafka
public class DemoKafkaApplication {
public static void main(final String[] args) {
SpringApplication.run(DemoKafkaApplication.class, args);
}
@Bean
public DefaultErrorHandler errorHandler() {
return new DefaultErrorHandler(new ExponentialBackOff());
}
@Service
public class DemoKafkaListener {
private static final Logger logger = LoggerFactory.getLogger(com.example.demokafka.DemoKafkaApplication.DemoKafkaListener.class);
@KafkaListener(topics = "demo-topic-001", groupId = "demo-group", batch = "true")
public void listen(final List<String> values) {
logger.info("Received values: {}", String.join(", ", values));
// simulate error in process handling
throw new RuntimeException();
}
}
}
Error logs:
...
2022-03-21 10:16:40.763 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.s.c.BatchMessagingMessageConverter : No header mapper is available; Jackson is required for the default mapper; headers (if present) are not mapped but provided raw in kafka_nativeHeaders
2022-03-21 10:16:40.763 DEBUG 44789 --- [ntainer#0-0-C-1] l.a.BatchMessagingMessageListenerAdapter : Processing [GenericMessage [payload=[test], headers={kafka_offset=[0], kafka_nativeHeaders=[RecordHeaders(headers = [], isReadOnly = false)], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2ae65efd, kafka_conversionFailures=[], kafka_timestampType=[CREATE_TIME], kafka_receivedPartitionId=[0], kafka_receivedMessageKey=[null], kafka_receivedTopic=[demo-topic-001], kafka_receivedTimestamp=[1647853602875], kafka_groupId=demo-group}]]
2022-03-21 10:16:40.763 INFO 44789 --- [ntainer#0-0-C-1] d.DemoKafkaApplication$DemoKafkaListener : Received values: test
2022-03-21 10:16:40.764 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.l.FallbackBatchErrorHandler : Retry failed for: demo-topic-001-0@0
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(java.util.List<java.lang.String>)' threw exception; nested exception is java.lang.RuntimeException; nested exception is java.lang.RuntimeException
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2634) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2264) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:2234) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$invokeBatchErrorHandler$42(KafkaMessageListenerContainer.java:2273) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.ErrorHandlingUtils.retryBatch(ErrorHandlingUtils.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.RetryingBatchErrorHandler.handle(RetryingBatchErrorHandler.java:103) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.ErrorHandlerAdapter.handleBatch(ErrorHandlerAdapter.java:153) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.FailedBatchProcessor.doHandle(FailedBatchProcessor.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.DefaultErrorHandler.handleBatch(DefaultErrorHandler.java:141) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2271) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:2109) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1952) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1354) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1345) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1237) ~[spring-kafka-2.8.3.jar:2.8.3]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
Caused by: java.lang.RuntimeException: null
at com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(DemoKafkaApplication.java:36) ~[classes/:na]
at jdk.internal.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.16.jar:5.3.16]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.16.jar:5.3.16]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:339) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
... 17 common frames omitted
2022-03-21 10:16:40.865 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.s.c.BatchMessagingMessageConverter : No header mapper is available; Jackson is required for the default mapper; headers (if present) are not mapped but provided raw in kafka_nativeHeaders
2022-03-21 10:16:40.865 DEBUG 44789 --- [ntainer#0-0-C-1] l.a.BatchMessagingMessageListenerAdapter : Processing [GenericMessage [payload=[test], headers={kafka_offset=[0], kafka_nativeHeaders=[RecordHeaders(headers = [], isReadOnly = false)], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2ae65efd, kafka_conversionFailures=[], kafka_timestampType=[CREATE_TIME], kafka_receivedPartitionId=[0], kafka_receivedMessageKey=[null], kafka_receivedTopic=[demo-topic-001], kafka_receivedTimestamp=[1647853602875], kafka_groupId=demo-group}]]
2022-03-21 10:16:40.866 INFO 44789 --- [ntainer#0-0-C-1] d.DemoKafkaApplication$DemoKafkaListener : Received values: test
2022-03-21 10:16:40.866 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.l.FallbackBatchErrorHandler : Retry failed for: demo-topic-001-0@0
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(java.util.List<java.lang.String>)' threw exception; nested exception is java.lang.RuntimeException; nested exception is java.lang.RuntimeException
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2634) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2264) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:2234) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$invokeBatchErrorHandler$42(KafkaMessageListenerContainer.java:2273) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.ErrorHandlingUtils.retryBatch(ErrorHandlingUtils.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.RetryingBatchErrorHandler.handle(RetryingBatchErrorHandler.java:103) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.ErrorHandlerAdapter.handleBatch(ErrorHandlerAdapter.java:153) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.FailedBatchProcessor.doHandle(FailedBatchProcessor.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.DefaultErrorHandler.handleBatch(DefaultErrorHandler.java:141) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2271) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:2109) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1952) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1354) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1345) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1237) ~[spring-kafka-2.8.3.jar:2.8.3]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
Caused by: java.lang.RuntimeException: null
at com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(DemoKafkaApplication.java:36) ~[classes/:na]
at jdk.internal.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.16.jar:5.3.16]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.16.jar:5.3.16]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:339) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
... 17 common frames omitted
2022-03-21 10:16:40.967 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.s.c.BatchMessagingMessageConverter : No header mapper is available; Jackson is required for the default mapper; headers (if present) are not mapped but provided raw in kafka_nativeHeaders
2022-03-21 10:16:40.967 DEBUG 44789 --- [ntainer#0-0-C-1] l.a.BatchMessagingMessageListenerAdapter : Processing [GenericMessage [payload=[test], headers={kafka_offset=[0], kafka_nativeHeaders=[RecordHeaders(headers = [], isReadOnly = false)], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2ae65efd, kafka_conversionFailures=[], kafka_timestampType=[CREATE_TIME], kafka_receivedPartitionId=[0], kafka_receivedMessageKey=[null], kafka_receivedTopic=[demo-topic-001], kafka_receivedTimestamp=[1647853602875], kafka_groupId=demo-group}]]
2022-03-21 10:16:40.968 INFO 44789 --- [ntainer#0-0-C-1] d.DemoKafkaApplication$DemoKafkaListener : Received values: test
2022-03-21 10:16:40.968 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.l.FallbackBatchErrorHandler : Retry failed for: demo-topic-001-0@0
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(java.util.List<java.lang.String>)' threw exception; nested exception is java.lang.RuntimeException; nested exception is java.lang.RuntimeException
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2634) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2264) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:2234) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$invokeBatchErrorHandler$42(KafkaMessageListenerContainer.java:2273) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.ErrorHandlingUtils.retryBatch(ErrorHandlingUtils.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.RetryingBatchErrorHandler.handle(RetryingBatchErrorHandler.java:103) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.ErrorHandlerAdapter.handleBatch(ErrorHandlerAdapter.java:153) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.FailedBatchProcessor.doHandle(FailedBatchProcessor.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.DefaultErrorHandler.handleBatch(DefaultErrorHandler.java:141) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2271) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:2109) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1952) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1354) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1345) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1237) ~[spring-kafka-2.8.3.jar:2.8.3]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
Caused by: java.lang.RuntimeException: null
at com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(DemoKafkaApplication.java:36) ~[classes/:na]
at jdk.internal.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.16.jar:5.3.16]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.16.jar:5.3.16]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:339) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
... 17 common frames omitted
2022-03-21 10:16:41.069 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.s.c.BatchMessagingMessageConverter : No header mapper is available; Jackson is required for the default mapper; headers (if present) are not mapped but provided raw in kafka_nativeHeaders
2022-03-21 10:16:41.069 DEBUG 44789 --- [ntainer#0-0-C-1] l.a.BatchMessagingMessageListenerAdapter : Processing [GenericMessage [payload=[test], headers={kafka_offset=[0], kafka_nativeHeaders=[RecordHeaders(headers = [], isReadOnly = false)], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2ae65efd, kafka_conversionFailures=[], kafka_timestampType=[CREATE_TIME], kafka_receivedPartitionId=[0], kafka_receivedMessageKey=[null], kafka_receivedTopic=[demo-topic-001], kafka_receivedTimestamp=[1647853602875], kafka_groupId=demo-group}]]
2022-03-21 10:16:41.069 INFO 44789 --- [ntainer#0-0-C-1] d.DemoKafkaApplication$DemoKafkaListener : Received values: test
2022-03-21 10:16:41.070 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.l.FallbackBatchErrorHandler : Retry failed for: demo-topic-001-0@0
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(java.util.List<java.lang.String>)' threw exception; nested exception is java.lang.RuntimeException; nested exception is java.lang.RuntimeException
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2634) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2264) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:2234) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$invokeBatchErrorHandler$42(KafkaMessageListenerContainer.java:2273) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.ErrorHandlingUtils.retryBatch(ErrorHandlingUtils.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.RetryingBatchErrorHandler.handle(RetryingBatchErrorHandler.java:103) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.ErrorHandlerAdapter.handleBatch(ErrorHandlerAdapter.java:153) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.FailedBatchProcessor.doHandle(FailedBatchProcessor.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.DefaultErrorHandler.handleBatch(DefaultErrorHandler.java:141) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2271) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:2109) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1952) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1354) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1345) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1237) ~[spring-kafka-2.8.3.jar:2.8.3]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
Caused by: java.lang.RuntimeException: null
at com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(DemoKafkaApplication.java:36) ~[classes/:na]
at jdk.internal.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.16.jar:5.3.16]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.16.jar:5.3.16]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:339) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
... 17 common frames omitted
2022-03-21 10:16:41.171 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.s.c.BatchMessagingMessageConverter : No header mapper is available; Jackson is required for the default mapper; headers (if present) are not mapped but provided raw in kafka_nativeHeaders
2022-03-21 10:16:41.171 DEBUG 44789 --- [ntainer#0-0-C-1] l.a.BatchMessagingMessageListenerAdapter : Processing [GenericMessage [payload=[test], headers={kafka_offset=[0], kafka_nativeHeaders=[RecordHeaders(headers = [], isReadOnly = false)], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2ae65efd, kafka_conversionFailures=[], kafka_timestampType=[CREATE_TIME], kafka_receivedPartitionId=[0], kafka_receivedMessageKey=[null], kafka_receivedTopic=[demo-topic-001], kafka_receivedTimestamp=[1647853602875], kafka_groupId=demo-group}]]
2022-03-21 10:16:41.172 INFO 44789 --- [ntainer#0-0-C-1] d.DemoKafkaApplication$DemoKafkaListener : Received values: test
2022-03-21 10:16:41.172 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.l.FallbackBatchErrorHandler : Retry failed for: demo-topic-001-0@0
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(java.util.List<java.lang.String>)' threw exception; nested exception is java.lang.RuntimeException; nested exception is java.lang.RuntimeException
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2634) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2264) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:2234) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$invokeBatchErrorHandler$42(KafkaMessageListenerContainer.java:2273) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.ErrorHandlingUtils.retryBatch(ErrorHandlingUtils.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.RetryingBatchErrorHandler.handle(RetryingBatchErrorHandler.java:103) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.ErrorHandlerAdapter.handleBatch(ErrorHandlerAdapter.java:153) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.FailedBatchProcessor.doHandle(FailedBatchProcessor.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.DefaultErrorHandler.handleBatch(DefaultErrorHandler.java:141) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2271) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:2109) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1952) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1354) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1345) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1237) ~[spring-kafka-2.8.3.jar:2.8.3]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
Caused by: java.lang.RuntimeException: null
at com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(DemoKafkaApplication.java:36) ~[classes/:na]
at jdk.internal.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.16.jar:5.3.16]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.16.jar:5.3.16]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:339) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
... 17 common frames omitted
2022-03-21 10:16:41.273 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.s.c.BatchMessagingMessageConverter : No header mapper is available; Jackson is required for the default mapper; headers (if present) are not mapped but provided raw in kafka_nativeHeaders
2022-03-21 10:16:41.273 DEBUG 44789 --- [ntainer#0-0-C-1] l.a.BatchMessagingMessageListenerAdapter : Processing [GenericMessage [payload=[test], headers={kafka_offset=[0], kafka_nativeHeaders=[RecordHeaders(headers = [], isReadOnly = false)], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2ae65efd, kafka_conversionFailures=[], kafka_timestampType=[CREATE_TIME], kafka_receivedPartitionId=[0], kafka_receivedMessageKey=[null], kafka_receivedTopic=[demo-topic-001], kafka_receivedTimestamp=[1647853602875], kafka_groupId=demo-group}]]
2022-03-21 10:16:41.273 INFO 44789 --- [ntainer#0-0-C-1] d.DemoKafkaApplication$DemoKafkaListener : Received values: test
2022-03-21 10:16:41.274 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.l.FallbackBatchErrorHandler : Retry failed for: demo-topic-001-0@0
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(java.util.List<java.lang.String>)' threw exception; nested exception is java.lang.RuntimeException; nested exception is java.lang.RuntimeException
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2634) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2264) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:2234) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$invokeBatchErrorHandler$42(KafkaMessageListenerContainer.java:2273) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.ErrorHandlingUtils.retryBatch(ErrorHandlingUtils.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.RetryingBatchErrorHandler.handle(RetryingBatchErrorHandler.java:103) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.ErrorHandlerAdapter.handleBatch(ErrorHandlerAdapter.java:153) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.FailedBatchProcessor.doHandle(FailedBatchProcessor.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.DefaultErrorHandler.handleBatch(DefaultErrorHandler.java:141) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2271) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:2109) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1952) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1354) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1345) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1237) ~[spring-kafka-2.8.3.jar:2.8.3]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
Caused by: java.lang.RuntimeException: null
at com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(DemoKafkaApplication.java:36) ~[classes/:na]
at jdk.internal.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.16.jar:5.3.16]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.16.jar:5.3.16]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:339) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
... 17 common frames omitted
2022-03-21 10:16:41.347 INFO 44789 --- [ionShutdownHook] o.s.c.support.DefaultLifecycleProcessor : Failed to shut down 1 bean with phase value 2147483547 within timeout of 30000ms: [org.springframework.kafka.config.internalKafkaListenerEndpointRegistry]
Process finished with exit code 130 (interrupted by signal 2: SIGINT)
The text was updated successfully, but these errors were encountered:
Using: Apache Kafka 2.8.3
Spring boot application is unable to shutdown (and timeouted in 30s) when KafkaListener in batch mode has setup infinite backoff policy.
RetryingBatchErrorHandler
(in combination withErrorHandlingUtils
) did not noticed that container should be shutdown and remain in endless loop also with ignoring sleep intervals (as there is a check insideListenerUtils.stoppableSleep(container, nextBackOff);
)Issue is easily reproducable - just start simple app (with some records in consumed topic) and then try to correctly stop it:
Error logs:
The text was updated successfully, but these errors were encountered: