77
77
import org .springframework .util .ClassUtils ;
78
78
import org .springframework .util .ObjectUtils ;
79
79
import org .springframework .util .StringUtils ;
80
+ import org .springframework .util .TypeUtils ;
80
81
81
82
/**
82
83
* An abstract {@link org.springframework.kafka.listener.MessageListener} adapter
@@ -320,6 +321,20 @@ public void setBeanResolver(BeanResolver beanResolver) {
320
321
this .evaluationContext .addPropertyAccessor (new MapAccessor ());
321
322
}
322
323
324
+ /**
325
+ * Set the retry callback for failures of both {@link CompletableFuture} and {@link Mono}.
326
+ * {@link MessagingMessageListenerAdapter#asyncFailure(Object, Acknowledgment, Consumer, Throwable, Message)}
327
+ * will invoke {@link MessagingMessageListenerAdapter#asyncRetryCallback} when
328
+ * {@link CompletableFuture} or {@link Mono} fails to complete.
329
+ * @param asyncRetryCallback the callback for async retry.
330
+ * @since 3.3
331
+ */
332
+ public void setCallbackForAsyncFailure (
333
+ @ Nullable BiConsumer <ConsumerRecord <K , V >, RuntimeException > asyncRetryCallback ) {
334
+
335
+ this .asyncRetryCallback = asyncRetryCallback ;
336
+ }
337
+
323
338
protected boolean isMessageList () {
324
339
return this .isMessageList ;
325
340
}
@@ -392,6 +407,7 @@ public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekC
392
407
393
408
protected Message <?> toMessagingMessage (ConsumerRecord <K , V > cRecord , @ Nullable Acknowledgment acknowledgment ,
394
409
Consumer <?, ?> consumer ) {
410
+
395
411
return getMessageConverter ().toMessage (cRecord , acknowledgment , consumer , getType ());
396
412
}
397
413
@@ -875,70 +891,47 @@ else if (isAck || isKotlinContinuation || isConsumer || annotationHeaderIsGroupI
875
891
private Type extractGenericParameterTypFromMethodParameter (MethodParameter methodParameter ) {
876
892
Type genericParameterType = methodParameter .getGenericParameterType ();
877
893
if (genericParameterType instanceof ParameterizedType parameterizedType ) {
878
- if (parameterizedType .getRawType ().equals (Message .class )) {
894
+ Type rawType = parameterizedType .getRawType ();
895
+ if (rawType .equals (Message .class )) {
879
896
genericParameterType = parameterizedType .getActualTypeArguments ()[0 ];
880
897
}
881
- else if (parameterizedType .getRawType ().equals (List .class )
882
- && parameterizedType .getActualTypeArguments ().length == 1 ) {
883
-
884
- Type paramType = getTypeFromWildCardWithUpperBound (parameterizedType .getActualTypeArguments ()[0 ]);
885
- this .isConsumerRecordList = parameterIsType (paramType , ConsumerRecord .class );
886
- boolean messageWithGeneric = rawByParameterIsType (paramType , Message .class );
887
- this .isMessageList = Message .class .equals (paramType ) || messageWithGeneric ;
888
- if (messageWithGeneric ) {
898
+ else if (rawType .equals (List .class ) && parameterizedType .getActualTypeArguments ().length == 1 ) {
899
+ Type paramType = parameterizedType .getActualTypeArguments ()[0 ];
900
+ boolean messageHasGeneric = paramType instanceof ParameterizedType pType
901
+ && pType .getRawType ().equals (Message .class );
902
+ this .isMessageList = TypeUtils .isAssignable (paramType , Message .class ) || messageHasGeneric ;
903
+ this .isConsumerRecordList = TypeUtils .isAssignable (paramType , ConsumerRecord .class );
904
+ if (messageHasGeneric ) {
889
905
genericParameterType = ((ParameterizedType ) paramType ).getActualTypeArguments ()[0 ];
890
906
}
891
907
}
892
908
else {
893
- this .isConsumerRecords = parameterizedType . getRawType () .equals (ConsumerRecords .class );
909
+ this .isConsumerRecords = rawType .equals (ConsumerRecords .class );
894
910
}
895
911
}
896
912
return genericParameterType ;
897
913
}
898
914
899
- private boolean annotationHeaderIsGroupId (MethodParameter methodParameter ) {
915
+ private static boolean annotationHeaderIsGroupId (MethodParameter methodParameter ) {
900
916
Header header = methodParameter .getParameterAnnotation (Header .class );
901
917
return header != null && KafkaHeaders .GROUP_ID .equals (header .value ());
902
918
}
903
919
904
- private Type getTypeFromWildCardWithUpperBound (Type paramType ) {
905
- if (paramType instanceof WildcardType wcType
906
- && wcType .getUpperBounds () != null
907
- && wcType .getUpperBounds ().length > 0 ) {
908
- paramType = wcType .getUpperBounds ()[0 ];
909
- }
910
- return paramType ;
911
- }
912
-
913
- private boolean isMessageWithNoTypeInfo (Type parameterType ) {
920
+ private static boolean isMessageWithNoTypeInfo (Type parameterType ) {
914
921
if (parameterType instanceof ParameterizedType pType && pType .getRawType ().equals (Message .class )) {
915
922
return pType .getActualTypeArguments ()[0 ] instanceof WildcardType ;
916
923
}
917
924
return Message .class .equals (parameterType ); // could be Message without a generic type
918
925
}
919
926
920
- private boolean parameterIsType (Type parameterType , Type type ) {
927
+ private static boolean parameterIsType (Type parameterType , Type type ) {
921
928
return parameterType .equals (type ) || rawByParameterIsType (parameterType , type );
922
929
}
923
930
924
- private boolean rawByParameterIsType (Type parameterType , Type type ) {
931
+ private static boolean rawByParameterIsType (Type parameterType , Type type ) {
925
932
return parameterType instanceof ParameterizedType pType && pType .getRawType ().equals (type );
926
933
}
927
934
928
- /**
929
- * Set the retry callback for failures of both {@link CompletableFuture} and {@link Mono}.
930
- * {@link MessagingMessageListenerAdapter#asyncFailure(Object, Acknowledgment, Consumer, Throwable, Message)}
931
- * will invoke {@link MessagingMessageListenerAdapter#asyncRetryCallback} when
932
- * {@link CompletableFuture} or {@link Mono} fails to complete.
933
- * @param asyncRetryCallback the callback for async retry.
934
- * @since 3.3
935
- */
936
- public void setCallbackForAsyncFailure (
937
- @ Nullable BiConsumer <ConsumerRecord <K , V >, RuntimeException > asyncRetryCallback ) {
938
-
939
- this .asyncRetryCallback = asyncRetryCallback ;
940
- }
941
-
942
935
/**
943
936
* Root object for reply expression evaluation.
944
937
* @param request the request.
@@ -947,6 +940,7 @@ public void setCallbackForAsyncFailure(
947
940
* @since 2.0
948
941
*/
949
942
public record ReplyExpressionRoot (Object request , Object source , Object result ) {
943
+
950
944
}
951
945
952
946
static class NoOpAck implements Acknowledgment {
0 commit comments