Skip to content

Commit e7c0d72

Browse files
authored
GH-2600: Fix Kotlin Listener List<Message<*>>
Resolves #2600 Also reproduced in Java with @KafkaListener(id = "kgh2600", topics = "kgh2600", batch = "true") void listen(List<? extends Message<?>> in) { System.out.println(in); } MMLA did not handle a `WildcardType` as the list element type. Also tested with reporter's Kotlin reproducer. **cherry-pick to 2.9.x** (pattern matching instanceof will need to be changed)
1 parent 83711f8 commit e7c0d72

File tree

2 files changed

+38
-6
lines changed

2 files changed

+38
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -683,11 +683,10 @@ else if (parameterizedType.getRawType().equals(List.class)
683683
this.isConsumerRecordList = paramType.equals(ConsumerRecord.class)
684684
|| (isSimpleListOfConsumerRecord(paramType)
685685
|| isListOfConsumerRecordUpperBounded(paramType));
686-
boolean messageHasGeneric = paramType instanceof ParameterizedType pType
687-
&& pType.getRawType().equals(Message.class);
688-
this.isMessageList = paramType.equals(Message.class) || messageHasGeneric;
689-
if (messageHasGeneric) {
690-
genericParameterType = ((ParameterizedType) paramType).getActualTypeArguments()[0];
686+
boolean messageWithGeneric = isMessageWithGeneric(paramType);
687+
this.isMessageList = paramType.equals(Message.class) || messageWithGeneric;
688+
if (messageWithGeneric) {
689+
genericParameterType = messagePayloadType(paramType);
691690
}
692691
}
693692
else {
@@ -697,6 +696,24 @@ else if (parameterizedType.getRawType().equals(List.class)
697696
return genericParameterType;
698697
}
699698

699+
private Type messagePayloadType(Type paramType) {
700+
if (paramType instanceof ParameterizedType pType) {
701+
return pType.getActualTypeArguments()[0];
702+
}
703+
else {
704+
return ((ParameterizedType) ((WildcardType) paramType).getUpperBounds()[0]).getActualTypeArguments()[0];
705+
}
706+
}
707+
708+
private boolean isMessageWithGeneric(Type paramType) {
709+
boolean messageHasGeneric = (paramType instanceof ParameterizedType pType
710+
&& pType.getRawType().equals(Message.class))
711+
|| (isWildCardWithUpperBound(paramType)
712+
&& ((WildcardType) paramType).getUpperBounds()[0] instanceof ParameterizedType wildCardZero
713+
&& wildCardZero.getRawType().equals(Message.class));
714+
return messageHasGeneric;
715+
}
716+
700717
private boolean isSimpleListOfConsumerRecord(Type paramType) {
701718
return paramType instanceof ParameterizedType pType && pType.getRawType().equals(ConsumerRecord.class);
702719
}

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,11 @@ public void testBatchMessages() throws Exception {
651651
assertThat(list.get(0)).isInstanceOf(Message.class);
652652
Message<?> m = (Message<?>) list.get(0);
653653
assertThat(m.getPayload()).isInstanceOf(String.class);
654+
assertThat(this.listener.latch14a.await(10, TimeUnit.SECONDS)).isTrue();
655+
assertThat(this.listener.payload2).isInstanceOf(List.class);
656+
list = (List<?>) this.listener.payload2;
657+
assertThat(list.size()).isGreaterThan(0);
658+
assertThat(list.get(0)).isInstanceOf(Message.class);
654659
}
655660

656661
@Test
@@ -1807,6 +1812,8 @@ static class Listener implements ConsumerSeekAware {
18071812

18081813
final CountDownLatch latch14 = new CountDownLatch(1);
18091814

1815+
final CountDownLatch latch14a = new CountDownLatch(1);
1816+
18101817
final CountDownLatch latch15 = new CountDownLatch(1);
18111818

18121819
final CountDownLatch latch16 = new CountDownLatch(1);
@@ -1845,6 +1852,8 @@ static class Listener implements ConsumerSeekAware {
18451852

18461853
volatile Object payload;
18471854

1855+
volatile Object payload2;
1856+
18481857
volatile Exception validationException;
18491858

18501859
volatile String convertedKey;
@@ -2083,6 +2092,12 @@ public void listen14(List<Message<?>> list) {
20832092
this.latch14.countDown();
20842093
}
20852094

2095+
@KafkaListener(id = "list5a", topics = "annotated18", containerFactory = "batchFactory")
2096+
public void listen14a(List<? extends Message<?>> list) {
2097+
this.payload2 = list;
2098+
this.latch14a.countDown();
2099+
}
2100+
20862101
@KafkaListener(id = "list6", topics = "annotated19", containerFactory = "batchManualFactory2")
20872102
public void listen15(List<Message<?>> list, Acknowledgment ack) {
20882103
this.payload = list;

0 commit comments

Comments
 (0)