Skip to content

Commit e2557a4

Browse files
garyrussellartembilan
authored andcommitted
GH-3617: KPMH - Option To Use Template's Converter
Resolves #3617 Optionally use the `KafkaTemplate` message converter instead of the default (or supplied) `ProducerRecordCreator`.
1 parent 0410343 commit e2557a4

File tree

3 files changed

+84
-2
lines changed

3 files changed

+84
-2
lines changed

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaProducerMessageHandlerSpec.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,13 +21,16 @@
2121
import java.util.function.Consumer;
2222
import java.util.function.Function;
2323

24+
import org.apache.kafka.clients.producer.ProducerRecord;
25+
2426
import org.springframework.expression.Expression;
2527
import org.springframework.expression.common.LiteralExpression;
2628
import org.springframework.integration.dsl.ComponentsRegistration;
2729
import org.springframework.integration.dsl.MessageHandlerSpec;
2830
import org.springframework.integration.expression.FunctionExpression;
2931
import org.springframework.integration.expression.ValueExpression;
3032
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
33+
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.ProducerRecordCreator;
3134
import org.springframework.kafka.core.KafkaTemplate;
3235
import org.springframework.kafka.core.ProducerFactory;
3336
import org.springframework.kafka.support.KafkaHeaderMapper;
@@ -375,6 +378,31 @@ public S futuresChannel(String futuresChannel) {
375378
return _this();
376379
}
377380

381+
/**
382+
* Set a {@link ProducerRecordCreator} to create the {@link ProducerRecord}. Ignored
383+
* if {@link #useTemplateConverter(boolean) useTemplateConverter} is true.
384+
* @param creator the creator.
385+
* @return the spec.
386+
* @since 5.5.5
387+
*/
388+
public S producerRecordCreator(ProducerRecordCreator<K, V> creator) {
389+
this.target.setProducerRecordCreator(creator);
390+
return _this();
391+
}
392+
393+
/**
394+
* Set to true to use the template's message converter to create the
395+
* {@link ProducerRecord} instead of the
396+
* {@link #producerRecordCreator(ProducerRecordCreator) producerRecordCreator}.
397+
* @param use true to use the converter.
398+
* @return the spec.
399+
* @since 5.5.5
400+
*/
401+
public S useTemplateConverter(boolean use) {
402+
this.target.setUseTemplateConverter(use);
403+
return _this();
404+
}
405+
378406
/**
379407
* A {@link KafkaTemplate}-based {@link KafkaProducerMessageHandlerSpec} extension.
380408
*

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ public class KafkaProducerMessageHandler<K, V> extends AbstractReplyProducingMes
163163

164164
private int timeoutBuffer = DEFAULT_TIMEOUT_BUFFER;
165165

166+
private boolean useTemplateConverter;
167+
166168
private volatile byte[] singleReplyTopic;
167169

168170
public KafkaProducerMessageHandler(final KafkaTemplate<K, V> kafkaTemplate) {
@@ -383,8 +385,10 @@ public void setReplyPayloadType(Type payloadType) {
383385
}
384386

385387
/**
386-
* Set a {@link ProducerRecordCreator} to create the {@link ProducerRecord}.
388+
* Set a {@link ProducerRecordCreator} to create the {@link ProducerRecord}. Ignored
389+
* if {@link #setUseTemplateConverter(boolean) useTemplateConverter} is true.
387390
* @param producerRecordCreator the creator.
391+
* @see #setUseTemplateConverter(boolean)
388392
*/
389393
public void setProducerRecordCreator(ProducerRecordCreator<K, V> producerRecordCreator) {
390394
Assert.notNull(producerRecordCreator, "'producerRecordCreator' cannot be null");
@@ -402,6 +406,18 @@ public void setTimeoutBuffer(int timeoutBuffer) {
402406
this.timeoutBuffer = timeoutBuffer;
403407
}
404408

409+
/**
410+
* Set to true to use the template's message converter to create the
411+
* {@link ProducerRecord} instead of the
412+
* {@link #setProducerRecordCreator(ProducerRecordCreator) producerRecordCreator}.
413+
* @param useTemplateConverter true to use the converter.
414+
* @since 5.5.5
415+
* @see #setProducerRecordCreator(ProducerRecordCreator)
416+
*/
417+
public void setUseTemplateConverter(boolean useTemplateConverter) {
418+
this.useTemplateConverter = useTemplateConverter;
419+
}
420+
405421
@Override
406422
public String getComponentType() {
407423
return this.isGateway ? "kafka:outbound-gateway" : "kafka:outbound-channel-adapter";
@@ -541,6 +557,9 @@ private ProducerRecord<K, V> createProducerRecord(final Message<?> message) {
541557
if (topic == null) {
542558
topic = this.kafkaTemplate.getDefaultTopic();
543559
}
560+
if (this.useTemplateConverter) {
561+
return (ProducerRecord<K, V>) this.kafkaTemplate.getMessageConverter().fromMessage(message, topic);
562+
}
544563

545564
Assert.state(StringUtils.hasText(topic), "The 'topic' can not be empty or null");
546565

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import org.springframework.integration.expression.FunctionExpression;
7474
import org.springframework.integration.expression.ValueExpression;
7575
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
76+
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.ProducerRecordCreator;
7677
import org.springframework.integration.kafka.support.KafkaIntegrationHeaders;
7778
import org.springframework.integration.kafka.support.KafkaSendFailureException;
7879
import org.springframework.integration.support.MessageBuilder;
@@ -90,6 +91,7 @@
9091
import org.springframework.kafka.support.KafkaNull;
9192
import org.springframework.kafka.support.SendResult;
9293
import org.springframework.kafka.support.TransactionSupport;
94+
import org.springframework.kafka.support.converter.RecordMessageConverter;
9395
import org.springframework.kafka.test.EmbeddedKafkaBroker;
9496
import org.springframework.kafka.test.utils.KafkaTestUtils;
9597
import org.springframework.kafka.transaction.KafkaTransactionManager;
@@ -783,6 +785,39 @@ void testNoFlush() {
783785
handler.stop();
784786
}
785787

788+
@SuppressWarnings({ "rawtypes", "unchecked" })
789+
@Test
790+
void conversion() {
791+
ProducerFactory pf = mock(ProducerFactory.class);
792+
Producer producer = mock(Producer.class);
793+
given(pf.createProducer()).willReturn(producer);
794+
ListenableFuture future = mock(ListenableFuture.class);
795+
willReturn(future).given(producer).send(any(ProducerRecord.class), any(Callback.class));
796+
KafkaTemplate template = new KafkaTemplate(pf);
797+
RecordMessageConverter converter = mock(RecordMessageConverter.class);
798+
ProducerRecord recordFromConverter = mock(ProducerRecord.class);
799+
given(converter.fromMessage(any(), any())).willReturn(recordFromConverter);
800+
template.setMessageConverter(converter);
801+
KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler(template);
802+
handler.setTopicExpression(new LiteralExpression("bar"));
803+
handler.setBeanFactory(mock(BeanFactory.class));
804+
ProducerRecordCreator creator = mock(ProducerRecordCreator.class);
805+
ProducerRecord recordFromCreator = mock(ProducerRecord.class);
806+
given(creator.create(any(), any(), any(), any(), any(), any(), any())).willReturn(recordFromCreator);
807+
handler.setProducerRecordCreator(creator);
808+
handler.afterPropertiesSet();
809+
handler.start();
810+
handler.handleMessage(new GenericMessage<>("foo"));
811+
ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
812+
verify(producer).send(captor.capture(), any(Callback.class));
813+
assertThat(captor.getValue()).isSameAs(recordFromCreator);
814+
handler.setUseTemplateConverter(true);
815+
handler.handleMessage(new GenericMessage<>("foo"));
816+
verify(producer, times(2)).send(captor.capture(), any(Callback.class));
817+
assertThat(captor.getValue()).isSameAs(recordFromConverter);
818+
handler.stop();
819+
}
820+
786821
@SuppressWarnings("serial")
787822
static class SomeOtherTransactionManager extends AbstractPlatformTransactionManager {
788823

0 commit comments

Comments
 (0)