Skip to content

Commit 22b74c7

Browse files
committed
GH-3790: Use new header constants for Kafka headers
Fixes #3790 Some `KafkaHeaders` constants have been removed and replaced with new more meaningful * Fix removed constants everywhere in the code and docs in favor of newly introduced, which replaces old
1 parent da5d2ca commit 22b74c7

File tree

8 files changed

+59
-59
lines changed

8 files changed

+59
-59
lines changed

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -416,12 +416,12 @@ private Message<?> enhanceReply(Message<?> message, Message<?> reply) {
416416
}
417417
builder.setHeader(KafkaHeaders.TOPIC, requestHeaders.get(KafkaHeaders.REPLY_TOPIC));
418418
}
419-
if (replyHeaders.get(KafkaHeaders.PARTITION_ID) == null &&
419+
if (replyHeaders.get(KafkaHeaders.PARTITION) == null &&
420420
requestHeaders.get(KafkaHeaders.REPLY_PARTITION) != null) {
421421
if (builder == null) {
422422
builder = getMessageBuilderFactory().fromMessage(reply);
423423
}
424-
builder.setHeader(KafkaHeaders.PARTITION_ID, requestHeaders.get(KafkaHeaders.REPLY_PARTITION));
424+
builder.setHeader(KafkaHeaders.PARTITION, requestHeaders.get(KafkaHeaders.REPLY_PARTITION));
425425
}
426426
if (builder != null) {
427427
return builder.build();

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2021 the original author or authors.
2+
* Copyright 2013-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -174,7 +174,7 @@ public KafkaProducerMessageHandler(final KafkaTemplate<K, V> kafkaTemplate) {
174174
if (this.isGateway) {
175175
setAsync(true);
176176
updateNotPropagatedHeaders(
177-
new String[]{KafkaHeaders.TOPIC, KafkaHeaders.PARTITION_ID, KafkaHeaders.MESSAGE_KEY}, false);
177+
new String[]{KafkaHeaders.TOPIC, KafkaHeaders.PARTITION, KafkaHeaders.KEY}, false);
178178
}
179179
if (JacksonPresent.isJackson2Present()) {
180180
this.headerMapper = new DefaultKafkaHeaderMapper();
@@ -565,11 +565,11 @@ private ProducerRecord<K, V> createProducerRecord(final Message<?> message) {
565565

566566
Integer partitionId = this.partitionIdExpression != null ?
567567
this.partitionIdExpression.getValue(this.evaluationContext, message, Integer.class)
568-
: messageHeaders.get(KafkaHeaders.PARTITION_ID, Integer.class);
568+
: messageHeaders.get(KafkaHeaders.PARTITION, Integer.class);
569569

570570
Object messageKey = this.messageKeyExpression != null
571571
? this.messageKeyExpression.getValue(this.evaluationContext, message)
572-
: messageHeaders.get(KafkaHeaders.MESSAGE_KEY);
572+
: messageHeaders.get(KafkaHeaders.KEY);
573573

574574
Long timestamp = this.timestampExpression != null
575575
? this.timestampExpression.getValue(this.evaluationContext, message, Long.class)

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,8 @@ void testKafkaAdapters() throws Exception {
196196
Acknowledgment acknowledgment = headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
197197
acknowledgment.acknowledge();
198198
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(TEST_TOPIC1);
199-
assertThat(headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo(i + 1);
200-
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)).isEqualTo(0);
199+
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(i + 1);
200+
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
201201
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo((long) i);
202202
assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME");
203203
assertThat(headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)).isEqualTo(1487694048633L);
@@ -213,8 +213,8 @@ void testKafkaAdapters() throws Exception {
213213
Acknowledgment acknowledgment = headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
214214
acknowledgment.acknowledge();
215215
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(TEST_TOPIC2);
216-
assertThat(headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo(i + 1);
217-
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)).isEqualTo(0);
216+
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(i + 1);
217+
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
218218
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo((long) i);
219219
assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME");
220220
assertThat(headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)).isEqualTo(1487694048644L);
@@ -313,7 +313,7 @@ public IntegrationFlow topic1ListenerFromKafkaFlow() {
313313
.onPartitionsAssignedSeekCallback((map, callback) ->
314314
ContextConfiguration.this.onPartitionsAssignedCalledLatch.countDown()))
315315
.filter(Message.class, m ->
316-
m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
316+
m.getHeaders().get(KafkaHeaders.RECEIVED_KEY, Integer.class) < 101,
317317
f -> f.throwExceptionOnRejection(true))
318318
.<String, String>transform(String::toUpperCase)
319319
.channel(c -> c.queue("listeningFromKafkaResults1"))
@@ -336,7 +336,7 @@ public IntegrationFlow topic2ListenerFromKafkaFlow() {
336336
.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2),
337337
KafkaMessageDrivenChannelAdapter.ListenerMode.record))
338338
.filter(Message.class, m ->
339-
m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
339+
m.getHeaders().get(KafkaHeaders.RECEIVED_KEY, Integer.class) < 101,
340340
f -> f.throwExceptionOnRejection(true))
341341
.<String, String>transform(String::toUpperCase)
342342
.channel(c -> c.queue("listeningFromKafkaResults2"))

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/InboundGatewayTests.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,9 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
166166
assertThat(received).isNotNull();
167167

168168
MessageHeaders headers = received.getHeaders();
169-
assertThat(headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo(1);
169+
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(1);
170170
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic1);
171-
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)).isEqualTo(0);
171+
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
172172
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L);
173173
assertThat(headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)).isEqualTo(1487694048607L);
174174
assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME");
@@ -254,9 +254,9 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
254254
MessageHeaders headers = failed.getHeaders();
255255
reply.send(MessageBuilder.withPayload("ERROR").copyHeaders(headers).build());
256256

257-
assertThat(headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo(1);
257+
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(1);
258258
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic3);
259-
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)).isEqualTo(0);
259+
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
260260
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L);
261261
assertThat(headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)).isEqualTo(1487694048607L);
262262
assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME");
@@ -338,9 +338,9 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
338338
MessageHeaders headers = failed.getHeaders();
339339
reply.send(MessageBuilder.withPayload("ERROR").copyHeaders(headers).build());
340340

341-
assertThat(headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo(1);
341+
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(1);
342342
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic5);
343-
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)).isEqualTo(0);
343+
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
344344
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L);
345345
assertThat(headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)).isEqualTo(1487694048607L);
346346
assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME");

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java

+15-15
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,9 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
167167
assertThat(received).isNotNull();
168168

169169
MessageHeaders headers = received.getHeaders();
170-
assertThat(headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo(1);
170+
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(1);
171171
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic1);
172-
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)).isEqualTo(0);
172+
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
173173
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L);
174174
assertThat(headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)).isEqualTo(1487694048607L);
175175
assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME");
@@ -183,9 +183,9 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
183183
assertThat(received.getPayload()).isInstanceOf(KafkaNull.class);
184184

185185
headers = received.getHeaders();
186-
assertThat(headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo(1);
186+
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(1);
187187
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic1);
188-
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)).isEqualTo(0);
188+
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
189189
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(1L);
190190
assertThat((Long) headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)).isGreaterThan(0L);
191191
assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME");
@@ -254,7 +254,7 @@ protected boolean doSend(Message<?> message, long timeout) {
254254
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
255255
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
256256
template.setDefaultTopic(topic4);
257-
Message<?> msg = MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, 1).build();
257+
Message<?> msg = MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.KEY, 1).build();
258258
NullChannel component = new NullChannel();
259259
component.setBeanName("myNullChannel");
260260
msg = MessageHistory.write(msg, component);
@@ -268,9 +268,9 @@ protected boolean doSend(Message<?> message, long timeout) {
268268
assertThat(originalMessage).isNotNull();
269269
assertThat(originalMessage.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA)).isNull();
270270
headers = originalMessage.getHeaders();
271-
assertThat(headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo(1);
271+
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(1);
272272
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic4);
273-
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)).isEqualTo(0);
273+
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
274274
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L);
275275
assertThat(StaticMessageHeaderAccessor.getDeliveryAttempt(originalMessage).get()).isEqualTo(2);
276276

@@ -381,9 +381,9 @@ protected boolean doSend(Message<?> message, long timeout) {
381381
assertThat(originalMessage.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA))
382382
.isSameAs(headers.get(KafkaHeaders.RAW_DATA));
383383
headers = originalMessage.getHeaders();
384-
assertThat(headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo(1);
384+
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(1);
385385
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic5);
386-
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)).isEqualTo(0);
386+
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
387387
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L);
388388
assertThat(StaticMessageHeaderAccessor.getDeliveryAttempt(originalMessage).get()).isEqualTo(1);
389389

@@ -438,9 +438,9 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment a
438438
assertThat(list.size()).isGreaterThan(0);
439439

440440
MessageHeaders headers = received.getHeaders();
441-
assertThat(headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo(Arrays.asList(1, 1));
441+
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(Arrays.asList(1, 1));
442442
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(Arrays.asList("testTopic2", "testTopic2"));
443-
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)).isEqualTo(Arrays.asList(0, 0));
443+
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(Arrays.asList(0, 0));
444444
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(Arrays.asList(0L, 1L));
445445
assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE))
446446
.isEqualTo(Arrays.asList("CREATE_TIME", "CREATE_TIME"));
@@ -507,9 +507,9 @@ void testInboundJson(EmbeddedKafkaBroker embeddedKafka) {
507507
assertThat(received).isNotNull();
508508

509509
MessageHeaders headers = received.getHeaders();
510-
assertThat(headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo(1);
510+
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(1);
511511
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic3);
512-
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)).isEqualTo(0);
512+
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
513513
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L);
514514

515515
assertThat(headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)).isEqualTo(1487694048607L);
@@ -554,9 +554,9 @@ void testInboundJsonWithPayload(EmbeddedKafkaBroker embeddedKafka) {
554554
assertThat(received).isNotNull();
555555

556556
MessageHeaders headers = received.getHeaders();
557-
assertThat(headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo(1);
557+
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(1);
558558
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic6);
559-
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)).isEqualTo(0);
559+
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
560560
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L);
561561
assertThat((Long) headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)).isGreaterThan(0L);
562562
assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME");

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java

+18-18
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ void testOutbound() {
165165

166166
Message<?> message = MessageBuilder.withPayload("foo")
167167
.setHeader(KafkaHeaders.TOPIC, topic1)
168-
.setHeader(KafkaHeaders.MESSAGE_KEY, 2)
169-
.setHeader(KafkaHeaders.PARTITION_ID, 1)
168+
.setHeader(KafkaHeaders.KEY, 2)
169+
.setHeader(KafkaHeaders.PARTITION, 1)
170170
.build();
171171
handler.handleMessage(message);
172172

@@ -177,7 +177,7 @@ void testOutbound() {
177177

178178
message = MessageBuilder.withPayload("bar")
179179
.setHeader(KafkaHeaders.TOPIC, topic1)
180-
.setHeader(KafkaHeaders.PARTITION_ID, 0)
180+
.setHeader(KafkaHeaders.PARTITION, 0)
181181
.build();
182182
handler.handleMessage(message);
183183
record = KafkaTestUtils.getSingleRecord(consumer, topic1);
@@ -197,8 +197,8 @@ record = KafkaTestUtils.getSingleRecord(consumer, topic1);
197197

198198
message = MessageBuilder.withPayload(KafkaNull.INSTANCE)
199199
.setHeader(KafkaHeaders.TOPIC, topic1)
200-
.setHeader(KafkaHeaders.MESSAGE_KEY, 2)
201-
.setHeader(KafkaHeaders.PARTITION_ID, "1")
200+
.setHeader(KafkaHeaders.KEY, 2)
201+
.setHeader(KafkaHeaders.PARTITION, "1")
202202
.build();
203203
handler.handleMessage(message);
204204

@@ -221,8 +221,8 @@ void testOutboundWithTimestamp() {
221221

222222
Message<?> message = MessageBuilder.withPayload("foo")
223223
.setHeader(KafkaHeaders.TOPIC, topic2)
224-
.setHeader(KafkaHeaders.MESSAGE_KEY, 2)
225-
.setHeader(KafkaHeaders.PARTITION_ID, 1)
224+
.setHeader(KafkaHeaders.KEY, 2)
225+
.setHeader(KafkaHeaders.PARTITION, 1)
226226
.setHeader(KafkaHeaders.TIMESTAMP, 1487694048607L)
227227
.setHeader("baz", "qux")
228228
.build();
@@ -252,8 +252,8 @@ void testOutboundWithTimestampExpression() {
252252

253253
Message<?> message = MessageBuilder.withPayload("foo")
254254
.setHeader(KafkaHeaders.TOPIC, topic3)
255-
.setHeader(KafkaHeaders.MESSAGE_KEY, 2)
256-
.setHeader(KafkaHeaders.PARTITION_ID, 1)
255+
.setHeader(KafkaHeaders.KEY, 2)
256+
.setHeader(KafkaHeaders.PARTITION, 1)
257257
.build();
258258

259259
handler.setTimestampExpression(new ValueExpression<>(1487694048633L));
@@ -293,8 +293,8 @@ void testOutboundWithAsyncResults() {
293293

294294
Message<?> message = MessageBuilder.withPayload("foo")
295295
.setHeader(KafkaHeaders.TOPIC, topic4)
296-
.setHeader(KafkaHeaders.MESSAGE_KEY, 2)
297-
.setHeader(KafkaHeaders.PARTITION_ID, 1)
296+
.setHeader(KafkaHeaders.KEY, 2)
297+
.setHeader(KafkaHeaders.PARTITION, 1)
298298
.build();
299299
handler.handleMessage(message);
300300

@@ -326,7 +326,7 @@ protected ListenableFuture<SendResult<Integer, String>> doSend(
326326
handler.afterPropertiesSet();
327327
message = MessageBuilder.withPayload("bar")
328328
.setHeader(KafkaHeaders.TOPIC, "foo")
329-
.setHeader(KafkaHeaders.PARTITION_ID, 0)
329+
.setHeader(KafkaHeaders.PARTITION, 0)
330330
.build();
331331
handler.handleMessage(message);
332332

@@ -409,8 +409,8 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
409409
if (payload == null) {
410410
message = MessageBuilder.withPayload("foo")
411411
.setHeader(KafkaHeaders.TOPIC, topic5)
412-
.setHeader(KafkaHeaders.MESSAGE_KEY, 2)
413-
.setHeader(KafkaHeaders.PARTITION_ID, 1)
412+
.setHeader(KafkaHeaders.KEY, 2)
413+
.setHeader(KafkaHeaders.PARTITION, 1)
414414
.build();
415415
}
416416
else {
@@ -435,8 +435,8 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
435435

436436
final Message<?> messageToHandle1 = MessageBuilder.withPayload("foo")
437437
.setHeader(KafkaHeaders.TOPIC, topic5)
438-
.setHeader(KafkaHeaders.MESSAGE_KEY, 2)
439-
.setHeader(KafkaHeaders.PARTITION_ID, 1)
438+
.setHeader(KafkaHeaders.KEY, 2)
439+
.setHeader(KafkaHeaders.PARTITION, 1)
440440
.setHeader(KafkaHeaders.REPLY_TOPIC, "bad")
441441
.build();
442442

@@ -447,8 +447,8 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
447447

448448
final Message<?> messageToHandle2 = MessageBuilder.withPayload("foo")
449449
.setHeader(KafkaHeaders.TOPIC, topic5)
450-
.setHeader(KafkaHeaders.MESSAGE_KEY, 2)
451-
.setHeader(KafkaHeaders.PARTITION_ID, 1)
450+
.setHeader(KafkaHeaders.KEY, 2)
451+
.setHeader(KafkaHeaders.PARTITION, 1)
452452
.setHeader(KafkaHeaders.REPLY_PARTITION, 999)
453453
.build();
454454

spring-integration-kafka/src/test/kotlin/org/springframework/integration/kafka/dsl/kotlin/KafkaDslKotlinTests.kt

+6-6
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,8 @@ class KafkaDslKotlinTests {
151151
val acknowledgment = headers[KafkaHeaders.ACKNOWLEDGMENT] as Acknowledgment
152152
acknowledgment.acknowledge()
153153
assertThat(headers[KafkaHeaders.RECEIVED_TOPIC]).isEqualTo(TEST_TOPIC1)
154-
assertThat(headers[KafkaHeaders.RECEIVED_MESSAGE_KEY]).isEqualTo(i + 1)
155-
assertThat(headers[KafkaHeaders.RECEIVED_PARTITION_ID]).isEqualTo(0)
154+
assertThat(headers[KafkaHeaders.RECEIVED_KEY]).isEqualTo(i + 1)
155+
assertThat(headers[KafkaHeaders.RECEIVED_PARTITION]).isEqualTo(0)
156156
assertThat(headers[KafkaHeaders.OFFSET]).isEqualTo(i.toLong())
157157
assertThat(headers[KafkaHeaders.TIMESTAMP_TYPE]).isEqualTo("CREATE_TIME")
158158
assertThat(headers[KafkaHeaders.RECEIVED_TIMESTAMP]).isEqualTo(1487694048633L)
@@ -168,8 +168,8 @@ class KafkaDslKotlinTests {
168168
val acknowledgment = headers[KafkaHeaders.ACKNOWLEDGMENT] as Acknowledgment
169169
acknowledgment.acknowledge()
170170
assertThat(headers[KafkaHeaders.RECEIVED_TOPIC]).isEqualTo(TEST_TOPIC2)
171-
assertThat(headers[KafkaHeaders.RECEIVED_MESSAGE_KEY]).isEqualTo(i + 1)
172-
assertThat(headers[KafkaHeaders.RECEIVED_PARTITION_ID]).isEqualTo(0)
171+
assertThat(headers[KafkaHeaders.RECEIVED_KEY]).isEqualTo(i + 1)
172+
assertThat(headers[KafkaHeaders.RECEIVED_PARTITION]).isEqualTo(0)
173173
assertThat(headers[KafkaHeaders.OFFSET]).isEqualTo(i.toLong())
174174
assertThat(headers[KafkaHeaders.TIMESTAMP_TYPE]).isEqualTo("CREATE_TIME")
175175
assertThat(headers[KafkaHeaders.RECEIVED_TIMESTAMP]).isEqualTo(1487694048644L)
@@ -240,7 +240,7 @@ class KafkaDslKotlinTests {
240240
.errorChannel(errorChannel())
241241
.retryTemplate(RetryTemplate())
242242
.filterInRetry(true)) {
243-
filter<Message<*>>({ m -> (m.headers[KafkaHeaders.RECEIVED_MESSAGE_KEY] as Int) < 101 }) { throwExceptionOnRejection(true) }
243+
filter<Message<*>>({ m -> (m.headers[KafkaHeaders.RECEIVED_KEY] as Int) < 101 }) { throwExceptionOnRejection(true) }
244244
transform<String> { it.uppercase() }
245245
channel { queue("listeningFromKafkaResults1") }
246246
}
@@ -254,7 +254,7 @@ class KafkaDslKotlinTests {
254254
.errorChannel(errorChannel())
255255
.retryTemplate(RetryTemplate())
256256
.filterInRetry(true)) {
257-
filter<Message<*>>({ m -> (m.headers[KafkaHeaders.RECEIVED_MESSAGE_KEY] as Int) < 101 }) { throwExceptionOnRejection(true) }
257+
filter<Message<*>>({ m -> (m.headers[KafkaHeaders.RECEIVED_KEY] as Int) < 101 }) { throwExceptionOnRejection(true) }
258258
transform<String> { it.uppercase() }
259259
channel { queue("listeningFromKafkaResults2") }
260260
}

0 commit comments

Comments
 (0)