Skip to content

Commit ac4aa7c

Browse files
authored
GH-2941: Minor changes in MessageListenerAdapter
Fixes: #2941 minor adjustment at `MessagingMessageListenerAdapter` (#2941)
1 parent a57c319 commit ac4aa7c

File tree

3 files changed

+9
-15
lines changed

3 files changed

+9
-15
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -63,7 +63,7 @@ public class BatchMessagingMessageListenerAdapter<K, V> extends MessagingMessage
6363

6464
private BatchMessageConverter batchMessageConverter = new BatchMessagingMessageConverter();
6565

66-
private KafkaListenerErrorHandler errorHandler;
66+
private final KafkaListenerErrorHandler errorHandler;
6767

6868
private BatchToRecordAdapter<K, V> batchToRecordAdapter;
6969

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -529,26 +529,20 @@ private Message<?> checkHeaders(Message<?> reply, @Nullable String topic, @Nulla
529529

530530
@SuppressWarnings("unchecked")
531531
private void sendSingleResult(Object result, String topic, @Nullable Object source) {
532-
byte[] correlationId = null;
533-
boolean sourceIsMessage = source instanceof Message;
534-
if (sourceIsMessage
535-
&& getCorrelation((Message<?>) source) != null) {
536-
correlationId = getCorrelation((Message<?>) source);
537-
}
538-
if (sourceIsMessage) {
539-
sendReplyForMessageSource(result, topic, source, correlationId);
532+
if (source instanceof Message<?> message) {
533+
sendReplyForMessageSource(result, topic, message, getCorrelation(message));
540534
}
541535
else {
542536
this.replyTemplate.send(topic, result);
543537
}
544538
}
545539

546540
@SuppressWarnings("unchecked")
547-
private void sendReplyForMessageSource(Object result, String topic, Object source, @Nullable byte[] correlationId) {
541+
private void sendReplyForMessageSource(Object result, String topic, Message<?> source, @Nullable byte[] correlationId) {
548542
MessageBuilder<Object> builder = MessageBuilder.withPayload(result)
549543
.setHeader(KafkaHeaders.TOPIC, topic);
550544
if (this.replyHeadersConfigurer != null) {
551-
Map<String, Object> headersToCopy = ((Message<?>) source).getHeaders().entrySet().stream()
545+
Map<String, Object> headersToCopy = source.getHeaders().entrySet().stream()
552546
.filter(e -> {
553547
String key = e.getKey();
554548
return !key.equals(MessageHeaders.ID) && !key.equals(MessageHeaders.TIMESTAMP)
@@ -568,7 +562,7 @@ private void sendReplyForMessageSource(Object result, String topic, Object sourc
568562
if (correlationId != null) {
569563
builder.setHeader(this.correlationHeaderName, correlationId);
570564
}
571-
setPartition(builder, ((Message<?>) source));
565+
setPartition(builder, source);
572566
this.replyTemplate.send(builder.build());
573567
}
574568

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -53,7 +53,7 @@
5353
public class RecordMessagingMessageListenerAdapter<K, V> extends MessagingMessageListenerAdapter<K, V>
5454
implements AcknowledgingConsumerAwareMessageListener<K, V> {
5555

56-
private KafkaListenerErrorHandler errorHandler;
56+
private final KafkaListenerErrorHandler errorHandler;
5757

5858
public RecordMessagingMessageListenerAdapter(Object bean, Method method) {
5959
this(bean, method, null);

0 commit comments

Comments
 (0)