@@ -115,6 +115,7 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
115
115
116
116
private final KafkaListenerErrorHandler errorHandler ;
117
117
118
+ @ Nullable
118
119
private HandlerAdapter handlerMethod ;
119
120
120
121
private boolean isConsumerRecordList ;
@@ -216,7 +217,7 @@ public void setMessagingConverter(SmartMessageConverter messageConverter) {
216
217
}
217
218
218
219
/**
219
- * Returns the inferred type for conversion or, if null, the
220
+ * Return the inferred type for conversion or, if null, the
220
221
* {@link #setFallbackType(Class) fallbackType}.
221
222
* @return the type.
222
223
*/
@@ -245,7 +246,7 @@ public void setHandlerMethod(HandlerAdapter handlerMethod) {
245
246
}
246
247
247
248
public boolean isAsyncReplies () {
248
- return this .handlerMethod .isAsyncReplies ();
249
+ return this .handlerMethod != null && this . handlerMethod .isAsyncReplies ();
249
250
}
250
251
251
252
protected boolean isConsumerRecordList () {
@@ -262,7 +263,7 @@ public boolean isConversionNeeded() {
262
263
263
264
/**
264
265
* Set the topic to which to send any result from the method invocation.
265
- * May be a SpEL expression {@code !{...}} evaluated at runtime.
266
+ * Maybe a SpEL expression {@code !{...}} evaluated at runtime.
266
267
* @param replyTopicParam the topic or expression.
267
268
* @since 2.0
268
269
*/
@@ -334,7 +335,7 @@ protected boolean isSplitIterables() {
334
335
}
335
336
336
337
/**
337
- * Set to false to disable splitting {@link Iterable} reply values into separate
338
+ * Set to {@code false} to disable splitting {@link Iterable} reply values into separate
338
339
* records.
339
340
* @param splitIterables false to disable; default true.
340
341
* @since 2.3.5
@@ -406,6 +407,7 @@ protected final Object invokeHandler(Object data, @Nullable Acknowledgment ackno
406
407
if (ack == null && this .noOpAck ) {
407
408
ack = NO_OP_ACK ;
408
409
}
410
+ Assert .notNull (this .handlerMethod , "the 'handlerMethod' must not be null" );
409
411
try {
410
412
if (data instanceof List && !this .isConsumerRecordList ) {
411
413
return this .handlerMethod .invoke (message , ack , consumer );
@@ -545,7 +547,9 @@ private String evaluateTopic(Object request, Object source, Object result, @Null
545
547
* @since 2.1.3
546
548
*/
547
549
@ SuppressWarnings ("unchecked" )
548
- protected void sendResponse (Object result , String topic , @ Nullable Object source , boolean returnTypeMessage ) {
550
+ protected void sendResponse (Object result , @ Nullable String topic , @ Nullable Object source ,
551
+ boolean returnTypeMessage ) {
552
+
549
553
if (!returnTypeMessage && topic == null ) {
550
554
this .logger .debug (() -> "No replyTopic to handle the reply: " + result );
551
555
}
@@ -622,7 +626,7 @@ private void sendReplyForMessageSource(Object result, String topic, Message<?> s
622
626
})
623
627
.filter (e -> this .replyHeadersConfigurer .shouldCopy (e .getKey (), e .getValue ()))
624
628
.collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
625
- if (headersToCopy .size () > 0 ) {
629
+ if (! headersToCopy .isEmpty () ) {
626
630
builder .copyHeaders (headersToCopy );
627
631
}
628
632
headersToCopy = this .replyHeadersConfigurer .additionalHeaders ();
@@ -637,7 +641,9 @@ private void sendReplyForMessageSource(Object result, String topic, Message<?> s
637
641
this .replyTemplate .send (builder .build ());
638
642
}
639
643
640
- protected void asyncSuccess (@ Nullable Object result , String replyTopic , Message <?> source , boolean returnTypeMessage ) {
644
+ protected void asyncSuccess (@ Nullable Object result , String replyTopic , Message <?> source ,
645
+ boolean returnTypeMessage ) {
646
+
641
647
if (result == null ) {
642
648
if (this .logger .isDebugEnabled ()) {
643
649
this .logger .debug ("Async result is null, ignoring" );
0 commit comments