Skip to content

Commit 573c297

Browse files
artembilangaryrussell
authored andcommitted
Fix SIK module for latest SK compatibility
Spring for Apache Kafka does not produce `ListenableFuture` anymore. * Rework `KafkaProducerMessageHandler` to deal with the `CompletableFuture` from now on * Add support for `CompletableFuture` replies handling into `AbstractMessageProducingHandler` * Remove redundant cast in the `DispatcherHasNoSubscribersTests`
1 parent 0422486 commit 573c297

File tree

4 files changed

+87
-91
lines changed

4 files changed

+87
-91
lines changed

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/channel/DispatcherHasNoSubscribersTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public void testPtP() throws Exception {
7777
amqpChannel.setBeanFactory(mock(BeanFactory.class));
7878
amqpChannel.afterPropertiesSet();
7979

80-
MessageListener listener = (MessageListener) container.getMessageListener();
80+
MessageListener listener = container.getMessageListener();
8181

8282
assertThatExceptionOfType(MessageDeliveryException.class)
8383
.isThrownBy(() -> listener.onMessage(new Message("Hello world!".getBytes())))
@@ -101,7 +101,7 @@ public void testPubSub() {
101101
amqpChannel.afterPropertiesSet();
102102

103103
List<String> logList = insertMockLoggerInListener(amqpChannel);
104-
MessageListener listener = (MessageListener) container.getMessageListener();
104+
MessageListener listener = container.getMessageListener();
105105
listener.onMessage(new Message("Hello world!".getBytes()));
106106
verifyLogReceived(logList);
107107
}

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2021 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,7 +23,9 @@
2323
import java.util.List;
2424
import java.util.Map;
2525
import java.util.Set;
26+
import java.util.concurrent.CompletableFuture;
2627
import java.util.concurrent.atomic.AtomicInteger;
28+
import java.util.function.BiConsumer;
2729

2830
import org.reactivestreams.Publisher;
2931

@@ -50,8 +52,6 @@
5052
import org.springframework.util.ObjectUtils;
5153
import org.springframework.util.StringUtils;
5254
import org.springframework.util.concurrent.ListenableFuture;
53-
import org.springframework.util.concurrent.ListenableFutureCallback;
54-
import org.springframework.util.concurrent.SettableListenableFuture;
5555

5656
import reactor.core.publisher.Flux;
5757
import reactor.core.publisher.Mono;
@@ -307,7 +307,10 @@ private void doProduceOutput(Message<?> requestMessage, MessageHeaders requestHe
307307
replyChannel = getOutputChannel();
308308
}
309309

310-
if (this.async && (reply instanceof ListenableFuture<?> || reply instanceof Publisher<?>)) {
310+
if (this.async && (reply instanceof ListenableFuture<?>
311+
|| reply instanceof CompletableFuture<?>
312+
|| reply instanceof Publisher<?>)) {
313+
311314
if (reply instanceof Publisher<?> &&
312315
replyChannel instanceof ReactiveStreamsSubscribableChannel) {
313316

@@ -349,12 +352,14 @@ else if (reply instanceof AbstractIntegrationMessageBuilder) {
349352
}
350353

351354
private void asyncNonReactiveReply(Message<?> requestMessage, Object reply, @Nullable Object replyChannel) {
352-
ListenableFuture<?> future;
353-
if (reply instanceof ListenableFuture<?>) {
354-
future = (ListenableFuture<?>) reply;
355+
CompletableFuture<?> future;
356+
if (reply instanceof CompletableFuture<?>) {
357+
future = (CompletableFuture<?>) reply;
358+
}
359+
else if (reply instanceof ListenableFuture<?>) {
360+
future = ((ListenableFuture<?>) reply).completable();
355361
}
356362
else {
357-
SettableListenableFuture<Object> settableListenableFuture = new SettableListenableFuture<>();
358363
Mono<?> reactiveReply;
359364
ReactiveAdapter adapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(null, reply);
360365
if (adapter != null && adapter.isMultiValue()) {
@@ -363,12 +368,10 @@ private void asyncNonReactiveReply(Message<?> requestMessage, Object reply, @Nul
363368
else {
364369
reactiveReply = Mono.from((Publisher<?>) reply);
365370
}
366-
reactiveReply
367-
.publishOn(Schedulers.boundedElastic())
368-
.subscribe(settableListenableFuture::set, settableListenableFuture::setException);
369-
future = settableListenableFuture;
371+
372+
future = reactiveReply.publishOn(Schedulers.boundedElastic()).toFuture();
370373
}
371-
future.addCallback(new ReplyFutureCallback(requestMessage, replyChannel));
374+
future.whenComplete(new ReplyFutureCallback(requestMessage, replyChannel));
372375
}
373376

374377
private Object getOutputChannelFromRoutingSlip(Object reply, Message<?> requestMessage, List<?> routingSlip,
@@ -517,7 +520,7 @@ protected Object resolveErrorChannel(final MessageHeaders requestHeaders) {
517520
return errorChannel;
518521
}
519522

520-
private final class ReplyFutureCallback implements ListenableFutureCallback<Object> {
523+
private final class ReplyFutureCallback implements BiConsumer<Object, Throwable> {
521524

522525
private final Message<?> requestMessage;
523526

@@ -529,29 +532,32 @@ private final class ReplyFutureCallback implements ListenableFutureCallback<Obje
529532
this.replyChannel = replyChannel;
530533
}
531534

532-
533535
@Override
534-
public void onSuccess(Object result) {
535-
Message<?> replyMessage = null;
536-
try {
537-
replyMessage = createOutputMessage(result, this.requestMessage.getHeaders());
538-
sendOutput(replyMessage, this.replyChannel, false);
539-
}
540-
catch (Exception ex) {
541-
Exception exceptionToLogAndSend = ex;
542-
if (!(ex instanceof MessagingException)) { // NOSONAR
543-
exceptionToLogAndSend = new MessageHandlingException(this.requestMessage, ex);
544-
if (replyMessage != null) {
545-
exceptionToLogAndSend = new MessagingException(replyMessage, exceptionToLogAndSend);
536+
public void accept(Object result, Throwable exception) {
537+
if (exception == null) {
538+
Message<?> replyMessage = null;
539+
try {
540+
replyMessage = createOutputMessage(result, this.requestMessage.getHeaders());
541+
sendOutput(replyMessage, this.replyChannel, false);
542+
}
543+
catch (Exception ex) {
544+
Exception exceptionToLogAndSend = ex;
545+
if (!(ex instanceof MessagingException)) { // NOSONAR
546+
exceptionToLogAndSend = new MessageHandlingException(this.requestMessage, ex);
547+
if (replyMessage != null) {
548+
exceptionToLogAndSend = new MessagingException(replyMessage, exceptionToLogAndSend);
549+
}
546550
}
551+
logger.error(exceptionToLogAndSend, () -> "Failed to send async reply: " + result.toString());
552+
onFailure(exceptionToLogAndSend);
547553
}
548-
logger.error(exceptionToLogAndSend, () -> "Failed to send async reply: " + result.toString());
549-
onFailure(exceptionToLogAndSend);
554+
}
555+
else {
556+
onFailure(exception);
550557
}
551558
}
552559

553-
@Override
554-
public void onFailure(Throwable ex) {
560+
private void onFailure(Throwable ex) {
555561
sendErrorMessage(this.requestMessage, ex);
556562
}
557563

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java

Lines changed: 42 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@
2323
import java.util.Map;
2424
import java.util.Set;
2525
import java.util.TreeSet;
26+
import java.util.concurrent.CompletableFuture;
2627
import java.util.concurrent.ExecutionException;
2728
import java.util.concurrent.Future;
2829
import java.util.concurrent.TimeUnit;
2930
import java.util.concurrent.TimeoutException;
3031
import java.util.concurrent.atomic.AtomicBoolean;
3132

32-
import org.apache.kafka.clients.consumer.ConsumerRecord;
3333
import org.apache.kafka.clients.producer.ProducerConfig;
3434
import org.apache.kafka.clients.producer.ProducerRecord;
3535
import org.apache.kafka.common.TopicPartition;
@@ -69,9 +69,6 @@
6969
import org.springframework.messaging.MessageHeaders;
7070
import org.springframework.util.Assert;
7171
import org.springframework.util.StringUtils;
72-
import org.springframework.util.concurrent.ListenableFuture;
73-
import org.springframework.util.concurrent.ListenableFutureCallback;
74-
import org.springframework.util.concurrent.SettableListenableFuture;
7572

7673
/**
7774
* A Message Handler for Apache Kafka; when supplied with a {@link ReplyingKafkaTemplate} it is used as
@@ -174,7 +171,7 @@ public KafkaProducerMessageHandler(final KafkaTemplate<K, V> kafkaTemplate) {
174171
if (this.isGateway) {
175172
setAsync(true);
176173
updateNotPropagatedHeaders(
177-
new String[]{KafkaHeaders.TOPIC, KafkaHeaders.PARTITION, KafkaHeaders.KEY}, false);
174+
new String[]{ KafkaHeaders.TOPIC, KafkaHeaders.PARTITION, KafkaHeaders.KEY }, false);
178175
}
179176
if (JacksonPresent.isJackson2Present()) {
180177
this.headerMapper = new DefaultKafkaHeaderMapper();
@@ -498,7 +495,7 @@ protected Object handleRequestMessage(final Message<?> message) {
498495
if (futureToken != null) {
499496
producerRecord.headers().remove(KafkaIntegrationHeaders.FUTURE_TOKEN);
500497
}
501-
ListenableFuture<SendResult<K, V>> sendFuture;
498+
CompletableFuture<SendResult<K, V>> sendFuture;
502499
RequestReplyFuture<K, V, Object> gatewayFuture = null;
503500
if (this.isGateway && (!preBuilt || producerRecord.headers().lastHeader(KafkaHeaders.REPLY_TOPIC) == null)) {
504501
producerRecord.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, getReplyTopic(message)));
@@ -530,7 +527,7 @@ protected Object handleRequestMessage(final Message<?> message) {
530527
return processReplyFuture(gatewayFuture);
531528
}
532529

533-
private void sendFutureIfRequested(ListenableFuture<SendResult<K, V>> sendFuture, Object futureToken) {
530+
private void sendFutureIfRequested(CompletableFuture<SendResult<K, V>> sendFuture, Object futureToken) {
534531

535532
if (futureToken != null) {
536533
MessageChannel futures = getFuturesChannel();
@@ -662,31 +659,26 @@ private void determineValidReplyTopicsAndPartitions() {
662659
}
663660

664661
public void processSendResult(final Message<?> message, final ProducerRecord<K, V> producerRecord,
665-
ListenableFuture<SendResult<K, V>> future, MessageChannel metadataChannel)
662+
CompletableFuture<SendResult<K, V>> future, MessageChannel metadataChannel)
666663
throws InterruptedException, ExecutionException {
667664

668665
final MessageChannel failureChannel = getSendFailureChannel();
669666
if (failureChannel != null || metadataChannel != null) {
670-
future.addCallback(new ListenableFutureCallback<SendResult<K, V>>() { // NOSONAR
671-
672-
@Override
673-
public void onSuccess(SendResult<K, V> result) {
667+
future.whenComplete((sendResult, exception) -> {
668+
if (exception == null) {
674669
if (metadataChannel != null) {
675670
KafkaProducerMessageHandler.this.messagingTemplate.send(metadataChannel,
676-
getMessageBuilderFactory().fromMessage(message)
677-
.setHeader(KafkaHeaders.RECORD_METADATA, result.getRecordMetadata()).build());
671+
getMessageBuilderFactory()
672+
.fromMessage(message)
673+
.setHeader(KafkaHeaders.RECORD_METADATA, sendResult.getRecordMetadata())
674+
.build());
678675
}
679676
}
680-
681-
@Override
682-
public void onFailure(Throwable ex) {
683-
if (failureChannel != null) {
684-
KafkaProducerMessageHandler.this.messagingTemplate.send(failureChannel,
685-
KafkaProducerMessageHandler.this.errorMessageStrategy.buildErrorMessage(
686-
new KafkaSendFailureException(message, producerRecord, ex), null));
687-
}
677+
else if (failureChannel != null) {
678+
KafkaProducerMessageHandler.this.messagingTemplate.send(failureChannel,
679+
KafkaProducerMessageHandler.this.errorMessageStrategy.buildErrorMessage(
680+
new KafkaSendFailureException(message, producerRecord, exception), null));
688681
}
689-
690682
});
691683
}
692684

@@ -720,51 +712,47 @@ private Future<?> processReplyFuture(@Nullable RequestReplyFuture<?, ?, Object>
720712
return new ConvertingReplyFuture(future);
721713
}
722714

723-
private final class ConvertingReplyFuture extends SettableListenableFuture<Object> {
715+
private final class ConvertingReplyFuture extends CompletableFuture<Object> {
724716

725717
ConvertingReplyFuture(RequestReplyFuture<?, ?, Object> future) {
726718
addCallback(future);
727719
}
728720

729721
private void addCallback(final RequestReplyFuture<?, ?, Object> future) {
730-
future.addCallback(new ListenableFutureCallback<ConsumerRecord<?, Object>>() { // NOSONAR
731-
732-
@Override
733-
public void onSuccess(ConsumerRecord<?, Object> result) {
722+
future.whenComplete((result, exception) -> {
723+
if (exception == null) {
734724
try {
735-
set(dontLeakHeaders(KafkaProducerMessageHandler.this.replyMessageConverter.toMessage(result,
736-
null, null, KafkaProducerMessageHandler.this.replyPayloadType)));
737-
}
738-
catch (Exception e) {
739-
setException(e);
740-
}
741-
}
742-
743-
private Message<?> dontLeakHeaders(Message<?> message) {
744-
if (message.getHeaders() instanceof KafkaMessageHeaders) {
745-
Map<String, Object> headers = ((KafkaMessageHeaders) message.getHeaders()).getRawHeaders();
746-
headers.remove(KafkaHeaders.CORRELATION_ID);
747-
headers.remove(KafkaHeaders.REPLY_TOPIC);
748-
headers.remove(KafkaHeaders.REPLY_PARTITION);
749-
return message;
725+
complete(dontLeakHeaders(
726+
KafkaProducerMessageHandler.this.replyMessageConverter.toMessage(result, null, null,
727+
KafkaProducerMessageHandler.this.replyPayloadType)));
750728
}
751-
else {
752-
return getMessageBuilderFactory().fromMessage(message)
753-
.removeHeader(KafkaHeaders.CORRELATION_ID)
754-
.removeHeader(KafkaHeaders.REPLY_TOPIC)
755-
.removeHeader(KafkaHeaders.REPLY_PARTITION)
756-
.build();
729+
catch (Exception ex) {
730+
completeExceptionally(ex);
757731
}
758732
}
759-
760-
@Override
761-
public void onFailure(Throwable ex) {
762-
setException(ex);
733+
else {
734+
completeExceptionally(exception);
763735
}
764-
765736
});
766737
}
767738

739+
private Message<?> dontLeakHeaders(Message<?> message) {
740+
if (message.getHeaders() instanceof KafkaMessageHeaders) {
741+
Map<String, Object> headers = ((KafkaMessageHeaders) message.getHeaders()).getRawHeaders();
742+
headers.remove(KafkaHeaders.CORRELATION_ID);
743+
headers.remove(KafkaHeaders.REPLY_TOPIC);
744+
headers.remove(KafkaHeaders.REPLY_PARTITION);
745+
return message;
746+
}
747+
else {
748+
return getMessageBuilderFactory().fromMessage(message)
749+
.removeHeader(KafkaHeaders.CORRELATION_ID)
750+
.removeHeader(KafkaHeaders.REPLY_TOPIC)
751+
.removeHeader(KafkaHeaders.REPLY_PARTITION)
752+
.build();
753+
}
754+
}
755+
768756
}
769757

770758
/**

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.Collections;
4040
import java.util.HashMap;
4141
import java.util.Map;
42+
import java.util.concurrent.CompletableFuture;
4243
import java.util.concurrent.CountDownLatch;
4344
import java.util.concurrent.TimeUnit;
4445
import java.util.concurrent.atomic.AtomicBoolean;
@@ -309,13 +310,14 @@ void testOutboundWithAsyncResults() {
309310

310311
final RuntimeException fooException = new RuntimeException("foo");
311312

312-
handler = new KafkaProducerMessageHandler<>(new KafkaTemplate<Integer, String>(producerFactory) {
313+
handler = new KafkaProducerMessageHandler<>(new KafkaTemplate<>(producerFactory) {
313314

314315
@Override
315-
protected ListenableFuture<SendResult<Integer, String>> doSend(
316+
protected CompletableFuture<SendResult<Integer, String>> doSend(
316317
ProducerRecord<Integer, String> producerRecord) {
317-
SettableListenableFuture<SendResult<Integer, String>> future = new SettableListenableFuture<>();
318-
future.setException(fooException);
318+
319+
CompletableFuture<SendResult<Integer, String>> future = new CompletableFuture<>();
320+
future.completeExceptionally(fooException);
319321
return future;
320322
}
321323

0 commit comments

Comments
 (0)