diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/json/JacksonJsonUtils.java b/spring-integration-core/src/main/java/org/springframework/integration/support/json/JacksonJsonUtils.java index 10120a46a10..72b8d0f8764 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/json/JacksonJsonUtils.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/json/JacksonJsonUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.integration.support.json; import java.io.IOException; +import java.io.Serial; import java.util.Arrays; import java.util.Collection; import java.util.LinkedHashSet; @@ -51,6 +52,20 @@ */ public final class JacksonJsonUtils { + /** + * The packages to trust on JSON deserialization by default. + */ + public static final List DEFAULT_TRUSTED_PACKAGES = + List.of( + "java.util", + "java.lang", + "org.springframework.messaging.support", + "org.springframework.integration.support", + "org.springframework.integration.message", + "org.springframework.integration.store", + "org.springframework.integration.history" + ); + private JacksonJsonUtils() { } @@ -99,17 +114,16 @@ public static ObjectMapper messagingAwareMapper(String... trustedPackages) { /** * An implementation of {@link ObjectMapper.DefaultTypeResolverBuilder} - * that wraps a default {@link TypeIdResolver} to the {@link AllowlistTypeIdResolver}. + * that wraps a default {@link TypeIdResolver} to the {@link AllowListTypeIdResolver}. * * @author Rob Winch * @author Artem Bilan * @author Filip Hanik * @author Gary Russell - * - * @since 4.3.11 */ private static final class AllowListTypeResolverBuilder extends ObjectMapper.DefaultTypeResolverBuilder { + @Serial private static final long serialVersionUID = 1L; private final String[] trustedPackages; @@ -133,8 +147,9 @@ protected TypeIdResolver idResolver(MapperConfig config, JavaType baseType, PolymorphicTypeValidator subtypeValidator, Collection subtypes, boolean forSer, boolean forDeser) { + TypeIdResolver result = super.idResolver(config, baseType, subtypeValidator, subtypes, forSer, forDeser); - return new AllowlistTypeIdResolver(result, this.trustedPackages); + return new AllowListTypeIdResolver(result, this.trustedPackages); } } @@ -146,27 +161,14 @@ protected TypeIdResolver idResolver(MapperConfig config, * * @author Rob Winch * @author Artem Bilan - * - * @since 4.3.11 */ - private static final class AllowlistTypeIdResolver implements TypeIdResolver { - - private static final List TRUSTED_PACKAGES = - Arrays.asList( - "java.util", - "java.lang", - "org.springframework.messaging.support", - "org.springframework.integration.support", - "org.springframework.integration.message", - "org.springframework.integration.store", - "org.springframework.integration.history" - ); + private static final class AllowListTypeIdResolver implements TypeIdResolver { private final TypeIdResolver delegate; - private final Set trustedPackages = new LinkedHashSet<>(TRUSTED_PACKAGES); + private final Set trustedPackages = new LinkedHashSet<>(DEFAULT_TRUSTED_PACKAGES); - AllowlistTypeIdResolver(TypeIdResolver delegate, String... trustedPackages) { + AllowListTypeIdResolver(TypeIdResolver delegate, String... trustedPackages) { this.delegate = delegate; if (trustedPackages != null) { for (String trustedPackage : trustedPackages) { diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/SubscribableKafkaChannel.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/SubscribableKafkaChannel.java index 7fd35db6b06..ca2fb65eb47 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/SubscribableKafkaChannel.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/SubscribableKafkaChannel.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 the original author or authors. + * Copyright 2020-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import org.springframework.integration.dispatcher.MessageDispatcher; import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy; import org.springframework.integration.dispatcher.UnicastingDispatcher; +import org.springframework.integration.support.json.JacksonJsonUtils; import org.springframework.integration.support.management.ManageableSmartLifecycle; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.KafkaOperations; @@ -29,6 +30,10 @@ import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter; import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.DefaultKafkaHeaderMapper; +import org.springframework.kafka.support.JacksonPresent; +import org.springframework.kafka.support.converter.MessagingMessageConverter; +import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.SubscribableChannel; import org.springframework.util.Assert; @@ -49,6 +54,8 @@ public class SubscribableKafkaChannel extends AbstractKafkaChannel implements Su private final KafkaListenerContainerFactory factory; + private final IntegrationRecordMessageListener recordListener = new IntegrationRecordMessageListener(); + private MessageDispatcher dispatcher; private MessageListenerContainer container; @@ -71,6 +78,24 @@ public SubscribableKafkaChannel(KafkaOperations template, KafkaListenerCon super(template, channelTopic); Assert.notNull(factory, "'factory' cannot be null"); this.factory = factory; + + if (JacksonPresent.isJackson2Present()) { + var messageConverter = new MessagingMessageConverter(); + var headerMapper = new DefaultKafkaHeaderMapper(); + headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0])); + messageConverter.setHeaderMapper(headerMapper); + this.recordListener.setMessageConverter(messageConverter); + } + } + + + /** + * Set the {@link RecordMessageConverter} to the listener. + * @param messageConverter the converter. + * @since 6.0 + */ + public void setMessageConverter(RecordMessageConverter messageConverter) { + this.recordListener.setMessageConverter(messageConverter); } @Override @@ -113,7 +138,7 @@ protected void onInit() { String groupId = getGroupId(); ContainerProperties containerProperties = this.container.getContainerProperties(); containerProperties.setGroupId(groupId != null ? groupId : getBeanName()); - containerProperties.setMessageListener(new IntegrationRecordMessageListener()); + containerProperties.setMessageListener(this.recordListener); } protected MessageDispatcher createDispatcher() { diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java index f8b683120b9..2fe60724291 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java @@ -36,15 +36,19 @@ import org.springframework.integration.support.AbstractIntegrationMessageBuilder; import org.springframework.integration.support.ErrorMessageUtils; import org.springframework.integration.support.MessageBuilder; +import org.springframework.integration.support.json.JacksonJsonUtils; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.listener.ConsumerSeekAware; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter; import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.DefaultKafkaHeaderMapper; +import org.springframework.kafka.support.JacksonPresent; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.converter.ConversionException; import org.springframework.kafka.support.converter.KafkaMessageHeaders; +import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; @@ -108,6 +112,13 @@ public KafkaInboundGateway(AbstractMessageListenerContainer messageListene this.messageListenerContainer.setAutoStartup(false); this.kafkaTemplate = kafkaTemplate; setErrorMessageStrategy(new RawRecordHeaderErrorMessageStrategy()); + if (JacksonPresent.isJackson2Present()) { + MessagingMessageConverter messageConverter = new MessagingMessageConverter(); + DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper(); + headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0])); + messageConverter.setHeaderMapper(headerMapper); + this.listener.setMessageConverter(messageConverter); + } } /** @@ -169,6 +180,7 @@ public void setRecoveryCallback(RecoveryCallback recoveryCallback) { */ public void setOnPartitionsAssignedSeekCallback( BiConsumer, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) { + this.onPartitionsAssignedSeekCallback = onPartitionsAssignedCallback; } @@ -192,9 +204,6 @@ protected void onInit() { } } ContainerProperties containerProperties = this.messageListenerContainer.getContainerProperties(); - Object existing = containerProperties.getMessageListener(); - Assert.state(existing == null, () -> "listener container cannot have an existing message listener (" + existing - + ")"); containerProperties.setMessageListener(this.listener); this.containerDeliveryAttemptPresent = containerProperties.isDeliveryAttemptHeader(); } diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java index 931653492ba..66b3c9968f4 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java @@ -17,11 +17,11 @@ package org.springframework.integration.kafka.inbound; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; -import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -37,6 +37,7 @@ import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy; import org.springframework.integration.support.ErrorMessageUtils; import org.springframework.integration.support.MessageBuilder; +import org.springframework.integration.support.json.JacksonJsonUtils; import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.listener.BatchMessageListener; import org.springframework.kafka.listener.ConsumerSeekAware; @@ -48,11 +49,14 @@ import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter; import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.DefaultKafkaHeaderMapper; +import org.springframework.kafka.support.JacksonPresent; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.ConversionException; import org.springframework.kafka.support.converter.KafkaMessageHeaders; import org.springframework.kafka.support.converter.MessageConverter; +import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; @@ -106,8 +110,6 @@ public class KafkaMessageDrivenChannelAdapter extends MessageProducerSuppo private boolean containerDeliveryAttemptPresent; - private boolean doFilterInRetry; - /** * Construct an instance with mode {@link ListenerMode#record}. * @param messageListenerContainer the container. @@ -131,6 +133,15 @@ public KafkaMessageDrivenChannelAdapter(AbstractMessageListenerContainer m this.messageListenerContainer.setAutoStartup(false); this.mode = mode; setErrorMessageStrategy(new RawRecordHeaderErrorMessageStrategy()); + + if (JacksonPresent.isJackson2Present()) { + MessagingMessageConverter messageConverter = new MessagingMessageConverter(); + DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper(); + headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0])); + messageConverter.setHeaderMapper(headerMapper); + this.recordListener.setMessageConverter(messageConverter); + this.batchListener.setMessageConverter(messageConverter); + } } /** @@ -149,7 +160,6 @@ else if (messageConverter instanceof BatchMessageConverter) { throw new IllegalArgumentException( "Message converter must be a 'RecordMessageConverter' or 'BatchMessageConverter'"); } - } /** @@ -280,14 +290,11 @@ protected void onInit() { super.onInit(); ContainerProperties containerProperties = this.messageListenerContainer.getContainerProperties(); - Object existing = containerProperties.getMessageListener(); - Assert.state(existing == null, () -> "listener container cannot have an existing message listener (" + existing - + ")"); if (this.mode.equals(ListenerMode.record)) { MessageListener listener = this.recordListener; - this.doFilterInRetry = this.filterInRetry && this.retryTemplate != null - && this.recordFilterStrategy != null; + boolean doFilterInRetry = + this.filterInRetry && this.retryTemplate != null && this.recordFilterStrategy != null; if (this.retryTemplate != null) { MessageChannel errorChannel = getErrorChannel(); @@ -296,7 +303,7 @@ protected void onInit() { } this.retryTemplate.registerListener(this.recordListener); } - if (!this.doFilterInRetry && this.recordFilterStrategy != null) { + if (!doFilterInRetry && this.recordFilterStrategy != null) { listener = new FilteringMessageListenerAdapter<>(listener, this.recordFilterStrategy, this.ackDiscarded); } @@ -597,7 +604,7 @@ private Message toMessage(List> records, Acknowledgment } catch (RuntimeException ex) { Exception exception = new ConversionException("Failed to convert to message", - records.stream().collect(Collectors.toList()), ex); + new ArrayList<>(records), ex); MessageChannel errorChannel = getErrorChannel(); if (errorChannel != null) { getMessagingTemplate().send(errorChannel, buildErrorMessage(message, exception)); diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java index 0f4720e8d04..7f621e0264a 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java @@ -51,12 +51,15 @@ import org.springframework.integration.core.Pausable; import org.springframework.integration.endpoint.AbstractMessageSource; import org.springframework.integration.support.AbstractIntegrationMessageBuilder; +import org.springframework.integration.support.json.JacksonJsonUtils; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ConsumerProperties; import org.springframework.kafka.listener.LoggingCommitCallback; import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.DefaultKafkaHeaderMapper; +import org.springframework.kafka.support.JacksonPresent; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaUtils; import org.springframework.kafka.support.LogIfLevelEnabled; @@ -233,6 +236,12 @@ public KafkaMessageSource(ConsumerFactory consumerFactory, this.assignTimeout = Duration.ofMillis(Math.max(this.pollTimeout.toMillis() * 20, MIN_ASSIGN_TIMEOUT)); // NOSONAR - magic this.commitTimeout = consumerProperties.getSyncCommitTimeout(); + + if (JacksonPresent.isJackson2Present()) { + DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper(); + headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0])); + ((MessagingMessageConverter) this.messageConverter).setHeaderMapper(headerMapper); + } } /** diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/channnel/ChannelTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/channnel/ChannelTests.java index 3972f94d5f4..c11dc706a06 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/channnel/ChannelTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/channnel/ChannelTests.java @@ -29,6 +29,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.integration.channel.NullChannel; +import org.springframework.integration.history.MessageHistory; import org.springframework.integration.kafka.channel.PollableKafkaChannel; import org.springframework.integration.kafka.channel.PublishSubscribeKafkaChannel; import org.springframework.integration.kafka.channel.SubscribableKafkaChannel; @@ -53,6 +55,7 @@ /** * @author Gary Russell + * @author Artem Bilan * * @since 5.4 * @@ -70,10 +73,18 @@ void subscribablePtp(@Autowired SubscribableChannel ptp) throws InterruptedExcep latch.countDown(); }); Message msg = new GenericMessage<>("foo"); + NullChannel component = new NullChannel(); + component.setBeanName("myNullChannel"); + msg = MessageHistory.write(msg, component); ptp.send(msg, 10_000L); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(message.get().getPayload()).isEqualTo("foo"); - assertThat(message.get().getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("channel.1"); + Message received = message.get(); + assertThat(received.getPayload()).isEqualTo("foo"); + assertThat(received.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("channel.1"); + + MessageHistory messageHistory = MessageHistory.read(received); + assertThat(messageHistory).isNotNull(); + assertThat(messageHistory.toString()).isEqualTo("myNullChannel"); } @Test diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaMessageDrivenChannelAdapterParserTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaMessageDrivenChannelAdapterParserTests.java index 43a12d74aa7..2c2b93c27f6 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaMessageDrivenChannelAdapterParserTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaMessageDrivenChannelAdapterParserTests.java @@ -156,8 +156,6 @@ void testKafkaMessageDrivenChannelAdapterOptions() { messageListener = containerProps.getMessageListener(); assertThat(messageListener.getClass().getName()).contains("$IntegrationRecordMessageListener"); - - assertThat(adapter).extracting("doFilterInRetry").isEqualTo(Boolean.TRUE); } } diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/InboundGatewayTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/InboundGatewayTests.java index ab5b8e9374c..93cea395b92 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/InboundGatewayTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/InboundGatewayTests.java @@ -29,8 +29,6 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.BeanFactory; @@ -53,6 +51,7 @@ import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.ContainerTestUtils; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.messaging.Message; @@ -76,38 +75,32 @@ * @since 5.4 * */ +@EmbeddedKafka(controlledShutdown = true, + topics = { InboundGatewayTests.topic1, + InboundGatewayTests.topic2, + InboundGatewayTests.topic3, + InboundGatewayTests.topic4, + InboundGatewayTests.topic5, + InboundGatewayTests.topic6, + InboundGatewayTests.topic7 }) class InboundGatewayTests { - private static String topic1 = "testTopic1"; + static final String topic1 = "testTopic1"; - private static String topic2 = "testTopic2"; + static final String topic2 = "testTopic2"; - private static String topic3 = "testTopic3"; + static final String topic3 = "testTopic3"; - private static String topic4 = "testTopic4"; + static final String topic4 = "testTopic4"; - private static String topic5 = "testTopic5"; + static final String topic5 = "testTopic5"; - private static String topic6 = "testTopic6"; + static final String topic6 = "testTopic6"; - private static String topic7 = "testTopic7"; - - private static EmbeddedKafkaBroker embeddedKafka; - - @BeforeAll - static void setup() { - embeddedKafka = new EmbeddedKafkaBroker(1, true, - topic1, topic2, topic3, topic4, topic5, topic6, topic7); - embeddedKafka.afterPropertiesSet(); - } - - @AfterAll - static void tearDown() { - embeddedKafka.destroy(); - } + static final String topic7 = "testTopic7"; @Test - void testInbound() throws Exception { + void testInbound(EmbeddedKafkaBroker embeddedKafka) throws Exception { Map consumerProps = KafkaTestUtils.consumerProps("replyHandler1", "false", embeddedKafka); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -199,7 +192,7 @@ public Message toMessage(ConsumerRecord record, Acknowledgment acknowle } @Test - void testInboundErrorRecover() { + void testInboundErrorRecover(EmbeddedKafkaBroker embeddedKafka) { Map consumerProps = KafkaTestUtils.consumerProps("replyHandler2", "false", embeddedKafka); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); ConsumerFactory cf2 = new DefaultKafkaConsumerFactory<>(consumerProps); @@ -278,7 +271,7 @@ public Message toMessage(ConsumerRecord record, Acknowledgment acknowle } @Test - void testInboundRetryErrorRecover() { + void testInboundRetryErrorRecover(EmbeddedKafkaBroker embeddedKafka) { Map consumerProps = KafkaTestUtils.consumerProps("replyHandler3", "false", embeddedKafka); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); ConsumerFactory cf2 = new DefaultKafkaConsumerFactory<>(consumerProps); @@ -362,7 +355,7 @@ public Message toMessage(ConsumerRecord record, Acknowledgment acknowle } @Test - void testInboundRetryErrorRecoverWithoutRecocveryCallback() throws Exception { + void testInboundRetryErrorRecoverWithoutRecoveryCallback(EmbeddedKafkaBroker embeddedKafka) throws Exception { Map consumerProps = KafkaTestUtils.consumerProps("replyHandler4", "false", embeddedKafka); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); ConsumerFactory cf2 = new DefaultKafkaConsumerFactory<>(consumerProps); diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java index e5511b59db0..cb83fae6275 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java @@ -40,6 +40,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -49,15 +50,15 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.StaticMessageHeaderAccessor; import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.channel.NullChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer; +import org.springframework.integration.history.MessageHistory; import org.springframework.integration.kafka.dsl.Kafka; import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.ListenerMode; import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy; @@ -85,6 +86,7 @@ import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.support.converter.StringJsonMessageConverter; import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.ContainerTestUtils; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.messaging.Message; @@ -109,36 +111,29 @@ * @since 5.4 * */ +@EmbeddedKafka(controlledShutdown = true, + topics = { MessageDrivenAdapterTests.topic1, + MessageDrivenAdapterTests.topic2, + MessageDrivenAdapterTests.topic3, + MessageDrivenAdapterTests.topic4, + MessageDrivenAdapterTests.topic5, + MessageDrivenAdapterTests.topic6 }) class MessageDrivenAdapterTests { - private static String topic1 = "testTopic1"; + static final String topic1 = "testTopic1"; - private static String topic2 = "testTopic2"; + static final String topic2 = "testTopic2"; - private static String topic3 = "testTopic3"; + static final String topic3 = "testTopic3"; - private static String topic4 = "testTopic4"; + static final String topic4 = "testTopic4"; - private static String topic5 = "testTopic5"; + static final String topic5 = "testTopic5"; - private static String topic6 = "testTopic6"; - - private static EmbeddedKafkaBroker embeddedKafka; - - @BeforeAll - static void setup() { - embeddedKafka = new EmbeddedKafkaBroker(1, true, - topic1, topic2, topic3, topic4, topic5, topic6); - embeddedKafka.afterPropertiesSet(); - } - - @AfterAll - static void tearDown() { - embeddedKafka.destroy(); - } + static final String topic6 = "testTopic6"; @Test - void testInboundRecord() { + void testInboundRecord(EmbeddedKafkaBroker embeddedKafka) { Map props = KafkaTestUtils.consumerProps("test1", "true", embeddedKafka); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); @@ -224,7 +219,7 @@ public Message toMessage(ConsumerRecord record, Acknowledgment acknowle } @Test - void testInboundRecordRetryRecover() { + void testInboundRecordRetryRecover(EmbeddedKafkaBroker embeddedKafka) { Map props = KafkaTestUtils.consumerProps("test4", "true", embeddedKafka); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); @@ -232,10 +227,12 @@ void testInboundRecordRetryRecover() { KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); KafkaMessageDrivenChannelAdapter adapter = new KafkaMessageDrivenChannelAdapter<>(container); + AtomicReference receivedMessageHistory = new AtomicReference<>(); MessageChannel out = new DirectChannel() { @Override protected boolean doSend(Message message, long timeout) { + receivedMessageHistory.set(MessageHistory.read(message)); throw new RuntimeException("intended"); } @@ -257,7 +254,11 @@ protected boolean doSend(Message message, long timeout) { ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(topic4); - template.sendDefault(1, "foo"); + Message msg = MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, 1).build(); + NullChannel component = new NullChannel(); + component.setBeanName("myNullChannel"); + msg = MessageHistory.write(msg, component); + template.send(msg); Message received = errorChannel.receive(10000); assertThat(received).isInstanceOf(ErrorMessage.class); @@ -273,6 +274,9 @@ protected boolean doSend(Message message, long timeout) { assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L); assertThat(StaticMessageHeaderAccessor.getDeliveryAttempt(originalMessage).get()).isEqualTo(2); + assertThat(receivedMessageHistory.get()).isNotNull(); + assertThat(receivedMessageHistory.get().toString()).isEqualTo("myNullChannel"); + adapter.stop(); } @@ -282,7 +286,7 @@ protected boolean doSend(Message message, long timeout) { * to the consumer. */ @Test - void testInboundRecordRetryRecoverWithoutRecoveryCallback() throws Exception { + void testInboundRecordRetryRecoverWithoutRecoveryCallback(EmbeddedKafkaBroker embeddedKafka) throws Exception { Map props = KafkaTestUtils.consumerProps("test6", "true", embeddedKafka); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); @@ -332,7 +336,7 @@ public void onError(RetryContext context, RetryCallback } @Test - void testInboundRecordNoRetryRecover() { + void testInboundRecordNoRetryRecover(EmbeddedKafkaBroker embeddedKafka) { Map props = KafkaTestUtils.consumerProps("test5", "true", embeddedKafka); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); @@ -387,7 +391,7 @@ protected boolean doSend(Message message, long timeout) { } @Test - void testInboundBatch() throws Exception { + void testInboundBatch(EmbeddedKafkaBroker embeddedKafka) throws Exception { Map props = KafkaTestUtils.consumerProps("test2", "true", embeddedKafka); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); @@ -475,7 +479,7 @@ public Message toMessage(List> records, Acknowledgment a } @Test - void testInboundJson() { + void testInboundJson(EmbeddedKafkaBroker embeddedKafka) { Map props = KafkaTestUtils.consumerProps("test3", "true", embeddedKafka); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); @@ -517,7 +521,7 @@ void testInboundJson() { } @Test - void testInboundJsonWithPayload() { + void testInboundJsonWithPayload(EmbeddedKafkaBroker embeddedKafka) { Map props = KafkaTestUtils.consumerProps("test6", "true", embeddedKafka); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceIntegrationTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceIntegrationTests.java index 1a11c39e072..1df46cec3d6 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceIntegrationTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceIntegrationTests.java @@ -26,17 +26,19 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.springframework.integration.channel.NullChannel; +import org.springframework.integration.history.MessageHistory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.ConsumerProperties; import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; /** * @author Gary Russell @@ -46,25 +48,13 @@ * @since 5.4 * */ +@EmbeddedKafka(controlledShutdown = true, topics = MessageSourceIntegrationTests.TOPIC1, partitions = 1) class MessageSourceIntegrationTests { - private static final String TOPIC1 = "MessageSourceIntegrationTests1"; - - private static EmbeddedKafkaBroker embeddedKafka; - - @BeforeAll - static void setup() { - embeddedKafka = new EmbeddedKafkaBroker(1, true, 1, TOPIC1); - embeddedKafka.afterPropertiesSet(); - } - - @AfterAll - static void tearDown() { - embeddedKafka.destroy(); - } + static final String TOPIC1 = "MessageSourceIntegrationTests1"; @Test - void testSource() throws Exception { + void testSource(EmbeddedKafkaBroker embeddedKafka) throws Exception { Map consumerProps = KafkaTestUtils.consumerProps("testSource", "false", embeddedKafka); consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -94,10 +84,16 @@ public void onPartitionsAssigned(Collection partitions) { Map producerProps = KafkaTestUtils.producerProps(embeddedKafka); DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(producerProps); KafkaTemplate template = new KafkaTemplate<>(producerFactory); - template.send(TOPIC1, "foo"); - template.send(TOPIC1, "bar"); - template.send(TOPIC1, "baz"); - template.send(TOPIC1, "qux"); + template.setDefaultTopic(TOPIC1); + template.sendDefault("foo"); + template.sendDefault("bar"); + template.sendDefault("baz"); + template.sendDefault("qux"); + Message msg = new GenericMessage<>("msg"); + NullChannel component = new NullChannel(); + component.setBeanName("myNullChannel"); + msg = MessageHistory.write(msg, component); + template.send(msg); Message received = source.receive(); assertThat(assigned.await(10, TimeUnit.SECONDS)).isTrue(); int n = 0; @@ -116,6 +112,12 @@ public void onPartitionsAssigned(Collection partitions) { assertThat(received).isNotNull(); assertThat(received.getPayload()).isEqualTo("qux"); received = source.receive(); + assertThat(received).isNotNull(); + assertThat(received.getPayload()).isEqualTo("msg"); + MessageHistory messageHistory = MessageHistory.read(received); + assertThat(messageHistory).isNotNull(); + assertThat(messageHistory.toString()).isEqualTo("myNullChannel"); + received = source.receive(); assertThat(received).isNull(); assertThat(KafkaTestUtils.getPropertyValue(source, "consumer.fetcher.minBytes")).isEqualTo(2); source.destroy();