Skip to content

Commit a96b5f0

Browse files
committed
GH-9647: Add AmqpInboundChannelAdapter.setHeaderNameForBatchedHeaders()
Fixes: #9647 Issue link: #9647
1 parent 29cd216 commit a96b5f0

File tree

2 files changed

+56
-38
lines changed

2 files changed

+56
-38
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ public enum BatchMode {
124124

125125
private BatchMode batchMode = BatchMode.MESSAGES;
126126

127+
private String headerNameForBatchedHeaders = CONSOLIDATED_HEADERS;
128+
127129
/**
128130
* Construct an instance using the provided container.
129131
* @param listenerContainer the container.
@@ -137,7 +139,8 @@ public AmqpInboundChannelAdapter(MessageListenerContainer listenerContainer) {
137139
this.messageListenerContainer = listenerContainer;
138140
this.messageListenerContainer.setAutoStartup(false);
139141
setErrorMessageStrategy(new AmqpMessageHeaderErrorMessageStrategy());
140-
this.abstractListenerContainer = listenerContainer instanceof AbstractMessageListenerContainer abstractMessageListenerContainer
142+
this.abstractListenerContainer =
143+
listenerContainer instanceof AbstractMessageListenerContainer abstractMessageListenerContainer
141144
? abstractMessageListenerContainer
142145
: null;
143146
}
@@ -220,6 +223,20 @@ public void setBatchMode(BatchMode batchMode) {
220223
this.batchMode = batchMode;
221224
}
222225

226+
/**
227+
* Set a header name containing {@code List<Map<String, Object>} headers when batch mode
228+
* is {@link BatchMode#EXTRACT_PAYLOADS_WITH_HEADERS}.
229+
* Defaults to {@link #CONSOLIDATED_HEADERS}.
230+
* @param headerNameForBatchedHeaders the name of header
231+
* containing {@code List<Map<String, Object>} headers when batch mode
232+
* is {@link BatchMode#EXTRACT_PAYLOADS_WITH_HEADERS}.
233+
* @since 6.4
234+
*/
235+
public void setHeaderNameForBatchedHeaders(String headerNameForBatchedHeaders) {
236+
Assert.hasText(headerNameForBatchedHeaders, "'headerNameForBatchedHeaders' must not be empty");
237+
this.headerNameForBatchedHeaders = headerNameForBatchedHeaders;
238+
}
239+
223240
@Override
224241
public String getComponentType() {
225242
return "amqp:inbound-channel-adapter";
@@ -436,7 +453,7 @@ protected org.springframework.messaging.Message<Object> createMessageFromPayload
436453
headers.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger());
437454
}
438455
if (listHeaders != null) {
439-
headers.put(CONSOLIDATED_HEADERS, listHeaders);
456+
headers.put(AmqpInboundChannelAdapter.this.headerNameForBatchedHeaders, listHeaders);
440457
}
441458
return getMessageBuilderFactory()
442459
.withPayload(payload)

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java

Lines changed: 37 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@
7575
import static org.mockito.ArgumentMatchers.anyString;
7676
import static org.mockito.ArgumentMatchers.isNull;
7777
import static org.mockito.BDDMockito.given;
78-
import static org.mockito.Mockito.doAnswer;
78+
import static org.mockito.BDDMockito.willReturn;
7979
import static org.mockito.Mockito.mock;
8080
import static org.mockito.Mockito.spy;
8181
import static org.mockito.Mockito.when;
@@ -90,9 +90,9 @@ public class InboundEndpointTests {
9090

9191
@Test
9292
public void testInt2809JavaTypePropertiesToAmqp() throws Exception {
93-
Connection connection = mock(Connection.class);
94-
doAnswer(invocation -> mock(Channel.class)).when(connection).createChannel(anyBoolean());
95-
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
93+
Connection connection = mock();
94+
willReturn(mock(Channel.class)).given(connection).createChannel(anyBoolean());
95+
ConnectionFactory connectionFactory = mock();
9696
when(connectionFactory.createConnection()).thenReturn(connection);
9797
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
9898
container.setConnectionFactory(connectionFactory);
@@ -104,7 +104,7 @@ public void testInt2809JavaTypePropertiesToAmqp() throws Exception {
104104
PollableChannel channel = new QueueChannel();
105105

106106
adapter.setOutputChannel(channel);
107-
adapter.setBeanFactory(mock(BeanFactory.class));
107+
adapter.setBeanFactory(mock());
108108
adapter.setBindSourceMessage(true);
109109
adapter.afterPropertiesSet();
110110

@@ -134,9 +134,9 @@ public void testInt2809JavaTypePropertiesToAmqp() throws Exception {
134134

135135
@Test
136136
public void testInt2809JavaTypePropertiesFromAmqp() throws Exception {
137-
Connection connection = mock(Connection.class);
138-
doAnswer(invocation -> mock(Channel.class)).when(connection).createChannel(anyBoolean());
139-
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
137+
Connection connection = mock();
138+
willReturn(mock(Channel.class)).given(connection).createChannel(anyBoolean());
139+
ConnectionFactory connectionFactory = mock();
140140
when(connectionFactory.createConnection()).thenReturn(connection);
141141
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
142142
container.setConnectionFactory(connectionFactory);
@@ -169,9 +169,9 @@ public void testInt2809JavaTypePropertiesFromAmqp() throws Exception {
169169

170170
@Test
171171
public void testMessageConverterJsonHeadersHavePrecedenceOverMessageHeaders() throws Exception {
172-
Connection connection = mock(Connection.class);
173-
doAnswer(invocation -> mock(Channel.class)).when(connection).createChannel(anyBoolean());
174-
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
172+
Connection connection = mock();
173+
willReturn(mock(Channel.class)).given(connection).createChannel(anyBoolean());
174+
ConnectionFactory connectionFactory = mock();
175175
when(connectionFactory.createConnection()).thenReturn(connection);
176176
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
177177
container.setConnectionFactory(connectionFactory);
@@ -235,9 +235,9 @@ public void testMessageConverterJsonHeadersHavePrecedenceOverMessageHeaders() th
235235

236236
@Test
237237
public void testAdapterConversionError() throws Exception {
238-
Connection connection = mock(Connection.class);
239-
doAnswer(invocation -> mock(Channel.class)).when(connection).createChannel(anyBoolean());
240-
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
238+
Connection connection = mock();
239+
willReturn(mock(Channel.class)).given(connection).createChannel(anyBoolean());
240+
ConnectionFactory connectionFactory = mock();
241241
when(connectionFactory.createConnection()).thenReturn(connection);
242242
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
243243
container.setConnectionFactory(connectionFactory);
@@ -285,9 +285,9 @@ public Object fromMessage(org.springframework.amqp.core.Message message) throws
285285

286286
@Test
287287
public void testGatewayConversionError() throws Exception {
288-
Connection connection = mock(Connection.class);
289-
doAnswer(invocation -> mock(Channel.class)).when(connection).createChannel(anyBoolean());
290-
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
288+
Connection connection = mock();
289+
willReturn(mock(Channel.class)).given(connection).createChannel(anyBoolean());
290+
ConnectionFactory connectionFactory = mock();
291291
when(connectionFactory.createConnection()).thenReturn(connection);
292292
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
293293
container.setConnectionFactory(connectionFactory);
@@ -339,7 +339,7 @@ public Object fromMessage(org.springframework.amqp.core.Message message) throws
339339

340340
@Test
341341
public void testRetryWithinOnMessageAdapter() throws Exception {
342-
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
342+
ConnectionFactory connectionFactory = mock();
343343
AbstractMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
344344
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container);
345345
adapter.setOutputChannel(new DirectChannel());
@@ -367,7 +367,7 @@ public void testRetryWithinOnMessageAdapter() throws Exception {
367367

368368
@Test
369369
public void testRetryWithMessageRecovererOnMessageAdapter() throws Exception {
370-
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
370+
ConnectionFactory connectionFactory = mock();
371371
AbstractMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
372372
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container);
373373
adapter.setOutputChannel(new DirectChannel());
@@ -400,7 +400,7 @@ public void testRetryWithMessageRecovererOnMessageAdapter() throws Exception {
400400

401401
@Test
402402
public void testRetryWithinOnMessageGateway() throws Exception {
403-
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
403+
ConnectionFactory connectionFactory = mock();
404404
AbstractMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
405405
AmqpInboundGateway adapter = new AmqpInboundGateway(container);
406406
adapter.setRequestChannel(new DirectChannel());
@@ -428,7 +428,7 @@ public void testRetryWithinOnMessageGateway() throws Exception {
428428

429429
@Test
430430
public void testRetryWithMessageRecovererOnMessageGateway() throws Exception {
431-
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
431+
ConnectionFactory connectionFactory = mock();
432432
AbstractMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
433433
AmqpInboundGateway adapter = new AmqpInboundGateway(container);
434434
adapter.setRequestChannel(new DirectChannel());
@@ -462,7 +462,7 @@ public void testRetryWithMessageRecovererOnMessageGateway() throws Exception {
462462
@SuppressWarnings({"unchecked"})
463463
@Test
464464
public void testBatchAdapter() throws Exception {
465-
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(mock(ConnectionFactory.class));
465+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(mock());
466466
container.setDeBatchingEnabled(false);
467467
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container);
468468
QueueChannel out = new QueueChannel();
@@ -486,7 +486,7 @@ public void testBatchAdapter() throws Exception {
486486
@SuppressWarnings({"unchecked"})
487487
@Test
488488
public void testBatchGateway() throws Exception {
489-
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(mock(ConnectionFactory.class));
489+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(mock());
490490
container.setDeBatchingEnabled(false);
491491
AmqpInboundGateway gateway = new AmqpInboundGateway(container);
492492
QueueChannel out = new QueueChannel();
@@ -514,12 +514,13 @@ public void testBatchGateway() throws Exception {
514514
@SuppressWarnings({"unchecked"})
515515
@Test
516516
public void testConsumerBatchExtract() {
517-
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(mock(ConnectionFactory.class));
517+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(mock());
518518
container.setConsumerBatchEnabled(true);
519519
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container);
520520
QueueChannel out = new QueueChannel();
521521
adapter.setOutputChannel(out);
522522
adapter.setBatchMode(BatchMode.EXTRACT_PAYLOADS_WITH_HEADERS);
523+
adapter.setHeaderNameForBatchedHeaders("some_batch_headers");
523524
adapter.afterPropertiesSet();
524525
ChannelAwareBatchMessageListener listener = (ChannelAwareBatchMessageListener) container.getMessageListener();
525526
MessageProperties messageProperties = new MessageProperties();
@@ -531,14 +532,14 @@ public void testConsumerBatchExtract() {
531532
Message<?> received = out.receive(0);
532533
assertThat(received).isNotNull();
533534
assertThat(((List<String>) received.getPayload())).contains("test1", "test2");
534-
assertThat(received.getHeaders().get(AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS, List.class))
535+
assertThat(received.getHeaders().get("some_batch_headers", List.class))
535536
.hasSize(2);
536537
}
537538

538539
@SuppressWarnings({"unchecked"})
539540
@Test
540541
public void testConsumerBatch() {
541-
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(mock(ConnectionFactory.class));
542+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(mock());
542543
container.setConsumerBatchEnabled(true);
543544
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container);
544545
QueueChannel out = new QueueChannel();
@@ -560,7 +561,7 @@ public void testConsumerBatch() {
560561

561562
@Test
562563
public void testConsumerBatchAndWrongMessageRecoverer() {
563-
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(mock(ConnectionFactory.class));
564+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(mock());
564565
container.setConsumerBatchEnabled(true);
565566
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container);
566567
adapter.setRetryTemplate(new RetryTemplate());
@@ -574,7 +575,7 @@ public void testConsumerBatchAndWrongMessageRecoverer() {
574575

575576
@Test
576577
public void testExclusiveRecover() {
577-
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(mock(AbstractMessageListenerContainer.class));
578+
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(mock());
578579
adapter.setRetryTemplate(new RetryTemplate());
579580
adapter.setMessageRecoverer((message, cause) -> {
580581
});
@@ -587,9 +588,9 @@ public void testExclusiveRecover() {
587588

588589
@Test
589590
public void testAdapterConversionErrorConsumerBatchExtract() {
590-
Connection connection = mock(Connection.class);
591-
doAnswer(invocation -> mock(Channel.class)).when(connection).createChannel(anyBoolean());
592-
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
591+
Connection connection = mock();
592+
willReturn(mock(Channel.class)).given(connection).createChannel(anyBoolean());
593+
ConnectionFactory connectionFactory = mock();
593594
when(connectionFactory.createConnection()).thenReturn(connection);
594595
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
595596
container.setConnectionFactory(connectionFactory);
@@ -644,9 +645,9 @@ public Object fromMessage(org.springframework.amqp.core.Message message) throws
644645

645646
@Test
646647
public void testAdapterConversionErrorConsumerBatch() {
647-
Connection connection = mock(Connection.class);
648-
doAnswer(invocation -> mock(Channel.class)).when(connection).createChannel(anyBoolean());
649-
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
648+
Connection connection = mock();
649+
willReturn(mock(Channel.class)).given(connection).createChannel(anyBoolean());
650+
ConnectionFactory connectionFactory = mock();
650651
when(connectionFactory.createConnection()).thenReturn(connection);
651652
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
652653
container.setConnectionFactory(connectionFactory);
@@ -700,7 +701,7 @@ public Object fromMessage(org.springframework.amqp.core.Message message) throws
700701

701702
@Test
702703
public void testRetryWithinOnMessageAdapterConsumerBatch() {
703-
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
704+
ConnectionFactory connectionFactory = mock();
704705
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
705706
container.setConsumerBatchEnabled(true);
706707
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container);
@@ -746,7 +747,7 @@ public void testRetryWithinOnMessageAdapterConsumerBatch() {
746747

747748
@Test
748749
public void testRetryWithMessageRecovererOnMessageAdapterConsumerBatch() throws InterruptedException {
749-
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
750+
ConnectionFactory connectionFactory = mock();
750751
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
751752
container.setConsumerBatchEnabled(true);
752753
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container);

0 commit comments

Comments
 (0)