Skip to content

KafkaMessageListenerContainer shutdown blocked by endless RetryingBatchErrorHandler #2179

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
vavrekm opened this issue Mar 21, 2022 · 0 comments · Fixed by #2180
Closed

KafkaMessageListenerContainer shutdown blocked by endless RetryingBatchErrorHandler #2179

vavrekm opened this issue Mar 21, 2022 · 0 comments · Fixed by #2180

Comments

@vavrekm
Copy link

vavrekm commented Mar 21, 2022

Using: Apache Kafka 2.8.3

Spring boot application is unable to shutdown (and timeouted in 30s) when KafkaListener in batch mode has setup infinite backoff policy. RetryingBatchErrorHandler (in combination with ErrorHandlingUtils) did not noticed that container should be shutdown and remain in endless loop also with ignoring sleep intervals (as there is a check inside ListenerUtils.stoppableSleep(container, nextBackOff);)

Issue is easily reproducable - just start simple app (with some records in consumed topic) and then try to correctly stop it:

package com.example.demokafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.stereotype.Service;
import org.springframework.util.backoff.ExponentialBackOff;

import java.util.List;

@SpringBootApplication
@EnableKafka
public class DemoKafkaApplication {
    public static void main(final String[] args) {
        SpringApplication.run(DemoKafkaApplication.class, args);
    }

    @Bean
    public DefaultErrorHandler errorHandler() {
        return new DefaultErrorHandler(new ExponentialBackOff());
    }

    @Service
    public class DemoKafkaListener {
        private static final Logger logger = LoggerFactory.getLogger(com.example.demokafka.DemoKafkaApplication.DemoKafkaListener.class);

        @KafkaListener(topics = "demo-topic-001", groupId = "demo-group", batch = "true")
        public void listen(final List<String> values) {
            logger.info("Received values: {}", String.join(", ", values));
            // simulate error in process handling
            throw new RuntimeException();
        }
    }
}

Error logs:

...
2022-03-21 10:16:40.763 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.s.c.BatchMessagingMessageConverter : No header mapper is available; Jackson is required for the default mapper; headers (if present) are not mapped but provided raw in kafka_nativeHeaders
2022-03-21 10:16:40.763 DEBUG 44789 --- [ntainer#0-0-C-1] l.a.BatchMessagingMessageListenerAdapter : Processing [GenericMessage [payload=[test], headers={kafka_offset=[0], kafka_nativeHeaders=[RecordHeaders(headers = [], isReadOnly = false)], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2ae65efd, kafka_conversionFailures=[], kafka_timestampType=[CREATE_TIME], kafka_receivedPartitionId=[0], kafka_receivedMessageKey=[null], kafka_receivedTopic=[demo-topic-001], kafka_receivedTimestamp=[1647853602875], kafka_groupId=demo-group}]]
2022-03-21 10:16:40.763  INFO 44789 --- [ntainer#0-0-C-1] d.DemoKafkaApplication$DemoKafkaListener : Received values: test
2022-03-21 10:16:40.764 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.l.FallbackBatchErrorHandler        : Retry failed for: demo-topic-001-0@0

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(java.util.List<java.lang.String>)' threw exception; nested exception is java.lang.RuntimeException; nested exception is java.lang.RuntimeException
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2634) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2264) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:2234) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$invokeBatchErrorHandler$42(KafkaMessageListenerContainer.java:2273) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.ErrorHandlingUtils.retryBatch(ErrorHandlingUtils.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.RetryingBatchErrorHandler.handle(RetryingBatchErrorHandler.java:103) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.ErrorHandlerAdapter.handleBatch(ErrorHandlerAdapter.java:153) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.FailedBatchProcessor.doHandle(FailedBatchProcessor.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.DefaultErrorHandler.handleBatch(DefaultErrorHandler.java:141) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2271) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:2109) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1952) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1354) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1345) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1237) ~[spring-kafka-2.8.3.jar:2.8.3]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
	Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
		at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
Caused by: java.lang.RuntimeException: null
	at com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(DemoKafkaApplication.java:36) ~[classes/:na]
	at jdk.internal.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.16.jar:5.3.16]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.16.jar:5.3.16]
	at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:339) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
	... 17 common frames omitted

2022-03-21 10:16:40.865 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.s.c.BatchMessagingMessageConverter : No header mapper is available; Jackson is required for the default mapper; headers (if present) are not mapped but provided raw in kafka_nativeHeaders
2022-03-21 10:16:40.865 DEBUG 44789 --- [ntainer#0-0-C-1] l.a.BatchMessagingMessageListenerAdapter : Processing [GenericMessage [payload=[test], headers={kafka_offset=[0], kafka_nativeHeaders=[RecordHeaders(headers = [], isReadOnly = false)], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2ae65efd, kafka_conversionFailures=[], kafka_timestampType=[CREATE_TIME], kafka_receivedPartitionId=[0], kafka_receivedMessageKey=[null], kafka_receivedTopic=[demo-topic-001], kafka_receivedTimestamp=[1647853602875], kafka_groupId=demo-group}]]
2022-03-21 10:16:40.866  INFO 44789 --- [ntainer#0-0-C-1] d.DemoKafkaApplication$DemoKafkaListener : Received values: test
2022-03-21 10:16:40.866 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.l.FallbackBatchErrorHandler        : Retry failed for: demo-topic-001-0@0

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(java.util.List<java.lang.String>)' threw exception; nested exception is java.lang.RuntimeException; nested exception is java.lang.RuntimeException
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2634) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2264) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:2234) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$invokeBatchErrorHandler$42(KafkaMessageListenerContainer.java:2273) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.ErrorHandlingUtils.retryBatch(ErrorHandlingUtils.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.RetryingBatchErrorHandler.handle(RetryingBatchErrorHandler.java:103) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.ErrorHandlerAdapter.handleBatch(ErrorHandlerAdapter.java:153) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.FailedBatchProcessor.doHandle(FailedBatchProcessor.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.DefaultErrorHandler.handleBatch(DefaultErrorHandler.java:141) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2271) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:2109) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1952) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1354) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1345) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1237) ~[spring-kafka-2.8.3.jar:2.8.3]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
	Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
		at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
Caused by: java.lang.RuntimeException: null
	at com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(DemoKafkaApplication.java:36) ~[classes/:na]
	at jdk.internal.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.16.jar:5.3.16]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.16.jar:5.3.16]
	at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:339) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
	... 17 common frames omitted

2022-03-21 10:16:40.967 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.s.c.BatchMessagingMessageConverter : No header mapper is available; Jackson is required for the default mapper; headers (if present) are not mapped but provided raw in kafka_nativeHeaders
2022-03-21 10:16:40.967 DEBUG 44789 --- [ntainer#0-0-C-1] l.a.BatchMessagingMessageListenerAdapter : Processing [GenericMessage [payload=[test], headers={kafka_offset=[0], kafka_nativeHeaders=[RecordHeaders(headers = [], isReadOnly = false)], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2ae65efd, kafka_conversionFailures=[], kafka_timestampType=[CREATE_TIME], kafka_receivedPartitionId=[0], kafka_receivedMessageKey=[null], kafka_receivedTopic=[demo-topic-001], kafka_receivedTimestamp=[1647853602875], kafka_groupId=demo-group}]]
2022-03-21 10:16:40.968  INFO 44789 --- [ntainer#0-0-C-1] d.DemoKafkaApplication$DemoKafkaListener : Received values: test
2022-03-21 10:16:40.968 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.l.FallbackBatchErrorHandler        : Retry failed for: demo-topic-001-0@0

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(java.util.List<java.lang.String>)' threw exception; nested exception is java.lang.RuntimeException; nested exception is java.lang.RuntimeException
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2634) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2264) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:2234) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$invokeBatchErrorHandler$42(KafkaMessageListenerContainer.java:2273) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.ErrorHandlingUtils.retryBatch(ErrorHandlingUtils.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.RetryingBatchErrorHandler.handle(RetryingBatchErrorHandler.java:103) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.ErrorHandlerAdapter.handleBatch(ErrorHandlerAdapter.java:153) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.FailedBatchProcessor.doHandle(FailedBatchProcessor.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.DefaultErrorHandler.handleBatch(DefaultErrorHandler.java:141) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2271) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:2109) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1952) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1354) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1345) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1237) ~[spring-kafka-2.8.3.jar:2.8.3]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
	Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
		at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
Caused by: java.lang.RuntimeException: null
	at com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(DemoKafkaApplication.java:36) ~[classes/:na]
	at jdk.internal.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.16.jar:5.3.16]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.16.jar:5.3.16]
	at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:339) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
	... 17 common frames omitted

2022-03-21 10:16:41.069 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.s.c.BatchMessagingMessageConverter : No header mapper is available; Jackson is required for the default mapper; headers (if present) are not mapped but provided raw in kafka_nativeHeaders
2022-03-21 10:16:41.069 DEBUG 44789 --- [ntainer#0-0-C-1] l.a.BatchMessagingMessageListenerAdapter : Processing [GenericMessage [payload=[test], headers={kafka_offset=[0], kafka_nativeHeaders=[RecordHeaders(headers = [], isReadOnly = false)], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2ae65efd, kafka_conversionFailures=[], kafka_timestampType=[CREATE_TIME], kafka_receivedPartitionId=[0], kafka_receivedMessageKey=[null], kafka_receivedTopic=[demo-topic-001], kafka_receivedTimestamp=[1647853602875], kafka_groupId=demo-group}]]
2022-03-21 10:16:41.069  INFO 44789 --- [ntainer#0-0-C-1] d.DemoKafkaApplication$DemoKafkaListener : Received values: test
2022-03-21 10:16:41.070 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.l.FallbackBatchErrorHandler        : Retry failed for: demo-topic-001-0@0

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(java.util.List<java.lang.String>)' threw exception; nested exception is java.lang.RuntimeException; nested exception is java.lang.RuntimeException
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2634) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2264) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:2234) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$invokeBatchErrorHandler$42(KafkaMessageListenerContainer.java:2273) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.ErrorHandlingUtils.retryBatch(ErrorHandlingUtils.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.RetryingBatchErrorHandler.handle(RetryingBatchErrorHandler.java:103) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.ErrorHandlerAdapter.handleBatch(ErrorHandlerAdapter.java:153) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.FailedBatchProcessor.doHandle(FailedBatchProcessor.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.DefaultErrorHandler.handleBatch(DefaultErrorHandler.java:141) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2271) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:2109) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1952) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1354) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1345) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1237) ~[spring-kafka-2.8.3.jar:2.8.3]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
	Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
		at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
Caused by: java.lang.RuntimeException: null
	at com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(DemoKafkaApplication.java:36) ~[classes/:na]
	at jdk.internal.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.16.jar:5.3.16]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.16.jar:5.3.16]
	at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:339) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
	... 17 common frames omitted

2022-03-21 10:16:41.171 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.s.c.BatchMessagingMessageConverter : No header mapper is available; Jackson is required for the default mapper; headers (if present) are not mapped but provided raw in kafka_nativeHeaders
2022-03-21 10:16:41.171 DEBUG 44789 --- [ntainer#0-0-C-1] l.a.BatchMessagingMessageListenerAdapter : Processing [GenericMessage [payload=[test], headers={kafka_offset=[0], kafka_nativeHeaders=[RecordHeaders(headers = [], isReadOnly = false)], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2ae65efd, kafka_conversionFailures=[], kafka_timestampType=[CREATE_TIME], kafka_receivedPartitionId=[0], kafka_receivedMessageKey=[null], kafka_receivedTopic=[demo-topic-001], kafka_receivedTimestamp=[1647853602875], kafka_groupId=demo-group}]]
2022-03-21 10:16:41.172  INFO 44789 --- [ntainer#0-0-C-1] d.DemoKafkaApplication$DemoKafkaListener : Received values: test
2022-03-21 10:16:41.172 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.l.FallbackBatchErrorHandler        : Retry failed for: demo-topic-001-0@0

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(java.util.List<java.lang.String>)' threw exception; nested exception is java.lang.RuntimeException; nested exception is java.lang.RuntimeException
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2634) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2264) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:2234) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$invokeBatchErrorHandler$42(KafkaMessageListenerContainer.java:2273) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.ErrorHandlingUtils.retryBatch(ErrorHandlingUtils.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.RetryingBatchErrorHandler.handle(RetryingBatchErrorHandler.java:103) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.ErrorHandlerAdapter.handleBatch(ErrorHandlerAdapter.java:153) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.FailedBatchProcessor.doHandle(FailedBatchProcessor.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.DefaultErrorHandler.handleBatch(DefaultErrorHandler.java:141) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2271) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:2109) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1952) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1354) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1345) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1237) ~[spring-kafka-2.8.3.jar:2.8.3]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
	Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
		at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
Caused by: java.lang.RuntimeException: null
	at com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(DemoKafkaApplication.java:36) ~[classes/:na]
	at jdk.internal.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.16.jar:5.3.16]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.16.jar:5.3.16]
	at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:339) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
	... 17 common frames omitted

2022-03-21 10:16:41.273 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.s.c.BatchMessagingMessageConverter : No header mapper is available; Jackson is required for the default mapper; headers (if present) are not mapped but provided raw in kafka_nativeHeaders
2022-03-21 10:16:41.273 DEBUG 44789 --- [ntainer#0-0-C-1] l.a.BatchMessagingMessageListenerAdapter : Processing [GenericMessage [payload=[test], headers={kafka_offset=[0], kafka_nativeHeaders=[RecordHeaders(headers = [], isReadOnly = false)], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2ae65efd, kafka_conversionFailures=[], kafka_timestampType=[CREATE_TIME], kafka_receivedPartitionId=[0], kafka_receivedMessageKey=[null], kafka_receivedTopic=[demo-topic-001], kafka_receivedTimestamp=[1647853602875], kafka_groupId=demo-group}]]
2022-03-21 10:16:41.273  INFO 44789 --- [ntainer#0-0-C-1] d.DemoKafkaApplication$DemoKafkaListener : Received values: test
2022-03-21 10:16:41.274 DEBUG 44789 --- [ntainer#0-0-C-1] o.s.k.l.FallbackBatchErrorHandler        : Retry failed for: demo-topic-001-0@0

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(java.util.List<java.lang.String>)' threw exception; nested exception is java.lang.RuntimeException; nested exception is java.lang.RuntimeException
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2634) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2264) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:2234) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$invokeBatchErrorHandler$42(KafkaMessageListenerContainer.java:2273) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.ErrorHandlingUtils.retryBatch(ErrorHandlingUtils.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.RetryingBatchErrorHandler.handle(RetryingBatchErrorHandler.java:103) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.ErrorHandlerAdapter.handleBatch(ErrorHandlerAdapter.java:153) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.FailedBatchProcessor.doHandle(FailedBatchProcessor.java:78) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.DefaultErrorHandler.handleBatch(DefaultErrorHandler.java:141) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2271) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:2109) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1952) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1354) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1345) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1237) ~[spring-kafka-2.8.3.jar:2.8.3]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
	Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
		at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
Caused by: java.lang.RuntimeException: null
	at com.example.demokafka.DemoKafkaApplication$DemoKafkaListener.listen(DemoKafkaApplication.java:36) ~[classes/:na]
	at jdk.internal.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.16.jar:5.3.16]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.16.jar:5.3.16]
	at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:339) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.8.3.jar:2.8.3]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2244) ~[spring-kafka-2.8.3.jar:2.8.3]
	... 17 common frames omitted

2022-03-21 10:16:41.347  INFO 44789 --- [ionShutdownHook] o.s.c.support.DefaultLifecycleProcessor  : Failed to shut down 1 bean with phase value 2147483547 within timeout of 30000ms: [org.springframework.kafka.config.internalKafkaListenerEndpointRegistry]

Process finished with exit code 130 (interrupted by signal 2: SIGINT)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants