Skip to content

GH-2387: Fix FallbackBatchErrorHandler Events #2420

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,23 @@ public List<KafkaMessageListenerContainer<K, V>> getContainers() {
}
}

@Override
public MessageListenerContainer getContainerFor(String topic, int partition) {
synchronized (this.lifecycleMonitor) {
for (KafkaMessageListenerContainer<K, V> container : this.containers) {
Collection<TopicPartition> assignedPartitions = container.getAssignedPartitions();
if (assignedPartitions != null) {
for (TopicPartition part : assignedPartitions) {
if (part.topic().equals(topic) && part.partition() == partition) {
return container;
}
}
}
}
return this;
}
}

@Override
public Collection<TopicPartition> getAssignedPartitions() {
synchronized (this.lifecycleMonitor) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import java.util.Collection;

import org.apache.kafka.common.TopicPartition;

/**
* Objects that can publish consumer pause/resume events.
*
* @author Gary Russell
* @since 2.8.10
*
*/
public interface ConsumerPauseResumeEventPublisher {

/**
* Publish a consumer paused event.
* @param partitions the paused partitions.
* @param reason the reason.
*/
void publishConsumerPausedEvent(Collection<TopicPartition> partitions, String reason);

/**
* Publish a consumer resumed event.
* @param partitions the resumed partitions.
*/
void publishConsumerResumedEvent(Collection<TopicPartition> partitions);

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.function.BiConsumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;

Expand Down Expand Up @@ -71,8 +72,11 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
consumer.pause(assignment);
int attempt = 1;
listen(retryListeners, records, thrownException, attempt++);
if (container instanceof KafkaMessageListenerContainer) {
((KafkaMessageListenerContainer<?, ?>) container).publishConsumerPausedEvent(assignment, "For batch retry");
ConsumerRecord<?, ?> first = records.iterator().next();
MessageListenerContainer childOrSingle = container.getContainerFor(first.topic(), first.partition());
if (childOrSingle instanceof ConsumerPauseResumeEventPublisher) {
((ConsumerPauseResumeEventPublisher) childOrSingle)
.publishConsumerPausedEvent(assignment, "For batch retry");
}
try {
while (nextBackOff != BackOffExecution.STOP) {
Expand Down Expand Up @@ -115,8 +119,8 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
finally {
Set<TopicPartition> assignment2 = consumer.assignment();
consumer.resume(assignment2);
if (container instanceof KafkaMessageListenerContainer) {
((KafkaMessageListenerContainer<?, ?>) container).publishConsumerResumedEvent(assignment2);
if (childOrSingle instanceof ConsumerPauseResumeEventPublisher) {
((ConsumerPauseResumeEventPublisher) childOrSingle).publishConsumerResumedEvent(assignment2);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@
* @author Daniel Gentes
*/
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
extends AbstractMessageListenerContainer<K, V> {
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {

private static final String UNUSED = "unused";

Expand Down Expand Up @@ -466,15 +466,17 @@ private void publishNonResponsiveConsumerEvent(long timeSinceLastPoll, Consumer<
}
}

void publishConsumerPausedEvent(Collection<TopicPartition> partitions, String reason) {
@Override
public void publishConsumerPausedEvent(Collection<TopicPartition> partitions, String reason) {
ApplicationEventPublisher publisher = getApplicationEventPublisher();
if (publisher != null) {
publisher.publishEvent(new ConsumerPausedEvent(this, this.thisOrParentContainer,
Collections.unmodifiableCollection(partitions), reason));
}
}

void publishConsumerResumedEvent(Collection<TopicPartition> partitions) {
@Override
public void publishConsumerResumedEvent(Collection<TopicPartition> partitions) {
ApplicationEventPublisher publisher = getApplicationEventPublisher();
if (publisher != null) {
publisher.publishEvent(new ConsumerResumedEvent(this, this.thisOrParentContainer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,17 @@ default void stopAbnormally(Runnable callback) {
stop(callback);
}

/**
* If this container has child containers, return the child container that is assigned
* the topic/partition. Return this when there are no child containers.
* @param topic the topic.
* @param partition the partition.
* @return the container.
*/
default MessageListenerContainer getContainerFor(String topic, int partition) {
return this;
}

@Override
default void destroy() {
stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand All @@ -32,10 +34,14 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.event.ConsumerPausedEvent;
import org.springframework.kafka.event.ConsumerResumedEvent;
import org.springframework.kafka.event.ConsumerStoppedEvent;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
Expand Down Expand Up @@ -88,8 +94,8 @@ public void testRetriesAndDlt() throws InterruptedException {
throw new ListenerExecutionFailedException("fail for retry batch");
});

KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
container.setBeanName("retryBatch");
final CountDownLatch recoverLatch = new CountDownLatch(1);
final AtomicReference<String> failedGroupId = new AtomicReference<>();
Expand All @@ -110,10 +116,21 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
FallbackBatchErrorHandler errorHandler = new FallbackBatchErrorHandler(new FixedBackOff(0L, 3), recoverer);
container.setCommonErrorHandler(errorHandler);
final CountDownLatch stopLatch = new CountDownLatch(1);
container.setApplicationEventPublisher(e -> {
if (e instanceof ConsumerStoppedEvent) {
stopLatch.countDown();
List<ApplicationEvent> events = new ArrayList<>();
container.setApplicationEventPublisher(new ApplicationEventPublisher() {

@Override
public void publishEvent(ApplicationEvent e) {
events.add(e);
if (e instanceof ConsumerStoppedEvent) {
stopLatch.countDown();
}
}

@Override
public void publishEvent(Object event) {
}

});
container.start();

Expand All @@ -134,6 +151,14 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
pf.destroy();
consumer.close();
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(events.stream()
.filter(ev -> ev instanceof ConsumerPausedEvent)
.collect(Collectors.toList()))
.hasSize(1);
assertThat(events.stream()
.filter(ev -> ev instanceof ConsumerResumedEvent)
.collect(Collectors.toList()))
.hasSize(1);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.BDDMockito.willThrow;
Expand Down Expand Up @@ -194,6 +195,7 @@ void rePauseOnRebalance() {
return records;
}).given(consumer).poll(any());
KafkaMessageListenerContainer<?, ?> container = mock(KafkaMessageListenerContainer.class);
given(container.getContainerFor(any(), anyInt())).willReturn(container);
given(container.isRunning()).willReturn(true);
eh.handleBatch(new RuntimeException(), records, consumer, container, () -> {
this.invoked++;
Expand Down