Skip to content

Commit 6e5d4b9

Browse files
garyrussellartembilan
authored andcommitted
GH-1872: @KafkaListener Override Listener Type
Resolves #1872
1 parent a6616ba commit 6e5d4b9

File tree

7 files changed

+89
-12
lines changed

7 files changed

+89
-12
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1524,6 +1524,9 @@ public KafkaListenerContainerFactory<?, ?> batchFactory() {
15241524
----
15251525
====
15261526

1527+
NOTE: Starting with version 2.8, you can override the factory's `batchListener` propery using the `batch` property on the `@KafkaListener` annotation.
1528+
This, together with the changes to <<error-handlers>> allows the same factory to be used for both record and batch listeners.
1529+
15271530
The following example shows how to receive a list of payloads:
15281531

15291532
====

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@ The listener container can now be configured to accept manual offset commits out
1515
The container will defer the commit until the missing offset is acknowledged.
1616
See <<ooo-commits>> for more information.
1717

18+
[[x28-batch-overrude]]
19+
==== `@KafkaListener` Changes
20+
21+
It is now possible to specify whether the listener method is a batch listener on the method itself.
22+
This allows the same container factory to be used for both record and batch listeners.
23+
See <<batch-listeners>> for more information.
24+
1825
[[x28-template]]
1926
==== `KafkaTemplate` Changes
2027

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,4 +283,18 @@
283283
*/
284284
String contentTypeConverter() default "";
285285

286+
/**
287+
* Override the container factory's {@code batchListener} property. The listener
288+
* method signature should receive a {@code List<?>}; refer to the reference
289+
* documentation. This allows a single container factory to be used for both record
290+
* and batch listeners; previously separate container factories were required.
291+
* @return "true" for the annotated method to be a batch listener or "false" for a
292+
* record listener. If not set, the container factory setting is used. SpEL and
293+
* property placeholders are not supported because the listener type cannot be
294+
* variable.
295+
* @since 2.8
296+
* @see Boolean#parseBoolean(String)
297+
*/
298+
String batch() default "";
299+
286300
}

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -560,12 +560,16 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
560560
}
561561
}
562562

563-
private void processKafkaListenerAnnotationForRetryTopic(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean) {
563+
private void processKafkaListenerAnnotationForRetryTopic(MethodKafkaListenerEndpoint<?, ?> endpoint,
564+
KafkaListener kafkaListener, Object bean) {
565+
564566
processKafkaListenerAnnotationBeforeRegistration(endpoint, kafkaListener, bean);
565567
processKafkaListenerEndpointAfterRegistration(endpoint, kafkaListener);
566568
}
567569

568-
private void processKafkaListenerAnnotationBeforeRegistration(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean) {
570+
private void processKafkaListenerAnnotationBeforeRegistration(MethodKafkaListenerEndpoint<?, ?> endpoint,
571+
KafkaListener kafkaListener, Object bean) {
572+
569573
endpoint.setBean(bean);
570574
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
571575
endpoint.setId(getEndpointId(kafkaListener));
@@ -591,9 +595,14 @@ private void processKafkaListenerAnnotationBeforeRegistration(MethodKafkaListene
591595
}
592596
resolveKafkaProperties(endpoint, kafkaListener.properties());
593597
endpoint.setSplitIterables(kafkaListener.splitIterables());
598+
if (StringUtils.hasText(kafkaListener.batch())) {
599+
endpoint.setBatchListener(Boolean.parseBoolean(kafkaListener.batch()));
600+
}
594601
}
595602

596-
private void processKafkaListenerEndpointAfterRegistration(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener) {
603+
private void processKafkaListenerEndpointAfterRegistration(MethodKafkaListenerEndpoint<?, ?> endpoint,
604+
KafkaListener kafkaListener) {
605+
597606
endpoint.setBeanFactory(this.beanFactory);
598607
String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
599608
if (StringUtils.hasText(errorHandlerBeanName)) {

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,10 +395,14 @@ private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> aklEndpoint)
395395
.acceptIfNotNull(this.retryTemplate, aklEndpoint::setRetryTemplate)
396396
.acceptIfNotNull(this.recoveryCallback, aklEndpoint::setRecoveryCallback)
397397
.acceptIfNotNull(this.statefulRetry, aklEndpoint::setStatefulRetry)
398-
.acceptIfNotNull(this.batchListener, aklEndpoint::setBatchListener)
399398
.acceptIfNotNull(this.replyTemplate, aklEndpoint::setReplyTemplate)
400399
.acceptIfNotNull(this.replyHeadersConfigurer, aklEndpoint::setReplyHeadersConfigurer)
401400
.acceptIfNotNull(this.batchToRecordAdapter, aklEndpoint::setBatchToRecordAdapter);
401+
if (aklEndpoint.getBatchListener() == null) {
402+
JavaUtils.INSTANCE
403+
.acceptIfNotNull(this.batchListener, aklEndpoint::setBatchListener);
404+
405+
}
402406
}
403407

404408
/**

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
100100

101101
private boolean statefulRetry;
102102

103-
private boolean batchListener;
103+
private Boolean batchListener;
104104

105105
private KafkaTemplate<?, ?> replyTemplate;
106106

@@ -263,6 +263,17 @@ public void setGroup(String group) {
263263
* @since 1.1
264264
*/
265265
public boolean isBatchListener() {
266+
return this.batchListener == null ? false : this.batchListener;
267+
}
268+
269+
/**
270+
* Return the current batch listener flag for this endpoint, or null if not explicitly
271+
* set.
272+
* @return the batch listener flag.
273+
* @since 2.8
274+
*/
275+
@Nullable
276+
public Boolean getBatchListener() {
266277
return this.batchListener;
267278
}
268279

@@ -509,9 +520,10 @@ private void setupMessageListener(MessageListenerContainer container,
509520
}
510521
adapter.setSplitIterables(this.splitIterables);
511522
Object messageListener = adapter;
523+
boolean isBatchListener = isBatchListener();
512524
Assert.state(messageListener != null,
513525
() -> "Endpoint [" + this + "] must provide a non null message listener");
514-
Assert.state(this.retryTemplate == null || !this.batchListener,
526+
Assert.state(this.retryTemplate == null || !isBatchListener,
515527
"A 'RetryTemplate' is not supported with a batch listener; consider configuring the container "
516528
+ "with a suitably configured 'SeekToCurrentBatchErrorHandler' instead");
517529
if (this.retryTemplate != null) {
@@ -520,9 +532,9 @@ private void setupMessageListener(MessageListenerContainer container,
520532
this.retryTemplate, this.recoveryCallback, this.statefulRetry);
521533
}
522534
if (this.recordFilterStrategy != null) {
523-
if (this.batchListener) {
535+
if (isBatchListener) {
524536
if (((MessagingMessageListenerAdapter<K, V>) messageListener).isConsumerRecords()) {
525-
this.logger.warn(() -> "Filter strategy ignored when consuming 'ConsumerRecords'"
537+
this.logger.warn(() -> "Filter strategy ignored when consuming 'ConsumerRecords' instead of a List"
526538
+ (this.id != null ? " id: " + this.id : ""));
527539
}
528540
else {

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import static org.mockito.Mockito.mock;
2828
import static org.mockito.Mockito.spy;
2929

30+
import java.io.PrintWriter;
31+
import java.io.StringWriter;
3032
import java.lang.reflect.Type;
3133
import java.time.Duration;
3234
import java.util.Collection;
@@ -104,6 +106,7 @@
104106
import org.springframework.kafka.listener.ConsumerSeekAware;
105107
import org.springframework.kafka.listener.ContainerProperties;
106108
import org.springframework.kafka.listener.ContainerProperties.AckMode;
109+
import org.springframework.kafka.listener.DefaultErrorHandler;
107110
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
108111
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
109112
import org.springframework.kafka.listener.ListenerExecutionFailedException;
@@ -261,9 +264,12 @@ public void testAnonymous() {
261264
public void testSimple() throws Exception {
262265
this.recordFilter.called = false;
263266
template.send("annotated1", 0, "foo");
267+
template.send("annotated1", 0, "bar");
264268
assertThat(this.listener.latch1.await(60, TimeUnit.SECONDS)).isTrue();
265269
assertThat(this.config.globalErrorThrowable).isNotNull();
266270
assertThat(this.listener.receivedGroupId).isEqualTo("foo");
271+
assertThat(this.listener.latch1Batch.await(60, TimeUnit.SECONDS)).isTrue();
272+
assertThat(this.listener.batchOverrideStackTrace).contains("BatchMessagingMessageListenerAdapter");
267273

268274
template.send("annotated2", 0, 123, "foo");
269275
assertThat(this.listener.latch2.await(60, TimeUnit.SECONDS)).isTrue();
@@ -1010,11 +1016,20 @@ public PlatformTransactionManager transactionManager() {
10101016
factory.setConsumerFactory(consumerFactory());
10111017
factory.setRecordFilterStrategy(recordFilter());
10121018
factory.setReplyTemplate(partitionZeroReplyTemplate());
1013-
factory.setErrorHandler((ConsumerAwareErrorHandler) (t, d, c) -> {
1014-
this.globalErrorThrowable = t;
1015-
c.seek(new org.apache.kafka.common.TopicPartition(d.topic(), d.partition()), d.offset());
1019+
factory.setCommonErrorHandler(new DefaultErrorHandler() {
1020+
1021+
@Override
1022+
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,
1023+
Consumer<?, ?> consumer, MessageListenerContainer container) {
1024+
1025+
Config.this.globalErrorThrowable = thrownException;
1026+
super.handleRemaining(thrownException, records, consumer, container);
1027+
}
1028+
10161029
});
10171030
factory.getContainerProperties().setMicrometerTags(Collections.singletonMap("extraTag", "foo"));
1031+
// ensure annotation wins, even with explicitly set here
1032+
factory.setBatchListener(false);
10181033
return factory;
10191034
}
10201035

@@ -1671,7 +1686,9 @@ static class Listener implements ConsumerSeekAware {
16711686

16721687
private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();
16731688

1674-
final CountDownLatch latch1 = new CountDownLatch(1);
1689+
final CountDownLatch latch1 = new CountDownLatch(2);
1690+
1691+
final CountDownLatch latch1Batch = new CountDownLatch(2);
16751692

16761693
final CountDownLatch latch2 = new CountDownLatch(2); // seek
16771694

@@ -1783,6 +1800,8 @@ static class Listener implements ConsumerSeekAware {
17831800

17841801
volatile Foo contentFoo;
17851802

1803+
volatile String batchOverrideStackTrace;
1804+
17861805
@KafkaListener(id = "manualStart", topics = "manualStart",
17871806
containerFactory = "kafkaAutoStartFalseListenerContainerFactory",
17881807
properties = { ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG + ":301000",
@@ -1801,6 +1820,15 @@ public void listen1(String foo, @Header(value = KafkaHeaders.GROUP_ID, required
18011820
this.latch1.countDown();
18021821
}
18031822

1823+
@KafkaListener(id = "fooBatch", topics = "annotated1", batch = "true")
1824+
public void listen1Batch(List<String> foo) {
1825+
StringWriter stringWriter = new StringWriter();
1826+
PrintWriter printWriter = new PrintWriter(stringWriter, true);
1827+
new RuntimeException().printStackTrace(printWriter);
1828+
this.batchOverrideStackTrace = stringWriter.getBuffer().toString();
1829+
foo.forEach(s -> this.latch1Batch.countDown());
1830+
}
1831+
18041832
@KafkaListener(id = "bar", topicPattern = "${topicTwo:annotated2}")
18051833
public void listen2(@Payload String foo,
18061834
@Header(KafkaHeaders.GROUP_ID) String groupId,

0 commit comments

Comments
 (0)