Skip to content

Commit 80243ed

Browse files
garyrussellartembilan
authored andcommitted
GH-2260: Reduce Header Constant Verbosity
Resolves #2260 **cherry-pick to main, removing 4 deprecated constants**
1 parent 4a5f4a1 commit 80243ed

File tree

16 files changed

+112
-79
lines changed

16 files changed

+112
-79
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -275,8 +275,8 @@ Then, to use the template, you can invoke one of its methods.
275275
When you use the methods with a `Message<?>` parameter, the topic, partition, and key information is provided in a message header that includes the following items:
276276

277277
* `KafkaHeaders.TOPIC`
278-
* `KafkaHeaders.PARTITION_ID`
279-
* `KafkaHeaders.MESSAGE_KEY`
278+
* `KafkaHeaders.PARTITION`
279+
* `KafkaHeaders.KEY`
280280
* `KafkaHeaders.TIMESTAMP`
281281

282282
The message payload is the data.
@@ -831,7 +831,7 @@ In this example, we use the reply topic header from the request:
831831
public Message<?> messageReturn(String in) {
832832
return MessageBuilder.withPayload(in.toUpperCase())
833833
.setHeader(KafkaHeaders.TOPIC, replyTo)
834-
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
834+
.setHeader(KafkaHeaders.KEY, 42)
835835
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
836836
.build();
837837
}
@@ -850,7 +850,7 @@ It will also echo the incoming `KafkaHeaders.CORRELATION_ID` and `KafkaHeaders.R
850850
@SendTo // default REPLY_TOPIC header
851851
public Message<?> messageReturn(String in) {
852852
return MessageBuilder.withPayload(in.toUpperCase())
853-
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
853+
.setHeader(KafkaHeaders.KEY, 42)
854854
.build();
855855
}
856856
----
@@ -1474,13 +1474,13 @@ Finally, metadata about the record is available from message headers.
14741474
You can use the following header names to retrieve the headers of the message:
14751475

14761476
* `KafkaHeaders.OFFSET`
1477-
* `KafkaHeaders.RECEIVED_MESSAGE_KEY`
1477+
* `KafkaHeaders.RECEIVED_KEY`
14781478
* `KafkaHeaders.RECEIVED_TOPIC`
1479-
* `KafkaHeaders.RECEIVED_PARTITION_ID`
1479+
* `KafkaHeaders.RECEIVED_PARTITION`
14801480
* `KafkaHeaders.RECEIVED_TIMESTAMP`
14811481
* `KafkaHeaders.TIMESTAMP_TYPE`
14821482

1483-
Starting with version 2.5 the `RECEIVED_MESSAGE_KEY` is not present if the incoming record has a `null` key; previously the header was populated with a `null` value.
1483+
Starting with version 2.5 the `RECEIVED_KEY` is not present if the incoming record has a `null` key; previously the header was populated with a `null` value.
14841484
This change is to make the framework consistent with `spring-messaging` conventions where `null` valued headers are not present.
14851485

14861486
The following example shows how to use the headers:
@@ -1490,8 +1490,8 @@ The following example shows how to use the headers:
14901490
----
14911491
@KafkaListener(id = "qux", topicPattern = "myTopic1")
14921492
public void listen(@Payload String foo,
1493-
@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) Integer key,
1494-
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
1493+
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
1494+
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
14951495
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
14961496
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
14971497
) {
@@ -1558,8 +1558,8 @@ The following example shows how to use the headers:
15581558
----
15591559
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
15601560
public void listen(List<String> list,
1561-
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
1562-
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
1561+
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
1562+
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
15631563
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
15641564
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
15651565
...
@@ -2160,7 +2160,7 @@ public class MultiListenerSendTo {
21602160
@KafkaHandler
21612161
@SendTo("!{'annotated25reply2'}")
21622162
public String bar(@Payload(required = false) KafkaNull nul,
2163-
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
2163+
@Header(KafkaHeaders.RECEIVED_KEY) int key) {
21642164
...
21652165
}
21662166
@@ -2258,7 +2258,7 @@ public Message<?> listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] rep
22582258
@Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {
22592259
return MessageBuilder.withPayload(in.toUpperCase())
22602260
.setHeader(KafkaHeaders.TOPIC, replyTo)
2261-
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
2261+
.setHeader(KafkaHeaders.KEY, 42)
22622262
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
22632263
.setHeader("someOtherHeader", "someValue")
22642264
.build();
@@ -4860,7 +4860,7 @@ The following example shows such a configuration:
48604860
[source, java]
48614861
----
48624862
@KafkaListener(id = "deletableListener", topics = "myTopic")
4863-
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
4863+
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
48644864
// value == null represents key deletion
48654865
}
48664866
----
@@ -4887,7 +4887,7 @@ static class MultiListenerBean {
48874887
}
48884888
48894889
@KafkaHandler
4890-
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
4890+
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
48914891
...
48924892
}
48934893
@@ -4973,7 +4973,7 @@ public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
49734973
MessageHeaders headers = m.getHeaders();
49744974
c.seek(new org.apache.kafka.common.TopicPartition(
49754975
headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
4976-
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
4976+
headers.get(KafkaHeaders.RECEIVED_PARTITION, Integer.class)),
49774977
headers.get(KafkaHeaders.OFFSET, Long.class));
49784978
return null;
49794979
};
@@ -4992,7 +4992,7 @@ public ConsumerAwareListenerErrorHandler listen10ErrorHandler() {
49924992
this.listen10Exception = e;
49934993
MessageHeaders headers = m.getHeaders();
49944994
List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
4995-
List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
4995+
List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION, List.class);
49964996
List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
49974997
Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
49984998
for (int i = 0; i < topics.size(); i++) {
@@ -5549,8 +5549,8 @@ For POJO batch listeners, starting with version 2.8.6, the header is copied into
55495549
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
55505550
info = "info for batch")
55515551
public void listen(List<Thing> list,
5552-
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
5553-
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
5552+
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
5553+
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
55545554
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
55555555
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
55565556
@Header(KafkaHeaders.LISTENER_INFO) String info) {

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,8 @@ public interface KafkaOperations<K, V> {
151151
* @param message the message to send.
152152
* @return a Future for the {@link SendResult}.
153153
* @see org.springframework.kafka.support.KafkaHeaders#TOPIC
154-
* @see org.springframework.kafka.support.KafkaHeaders#PARTITION_ID
155-
* @see org.springframework.kafka.support.KafkaHeaders#MESSAGE_KEY
154+
* @see org.springframework.kafka.support.KafkaHeaders#PARTITION
155+
* @see org.springframework.kafka.support.KafkaHeaders#KEY
156156
*/
157157
ListenableFuture<SendResult<K, V>> send(Message<?> message);
158158

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -479,7 +479,7 @@ private Message<?> checkHeaders(Object result, String topic, Object source) { //
479479
boolean needsTopic = headers.get(KafkaHeaders.TOPIC) == null;
480480
boolean sourceIsMessage = source instanceof Message;
481481
boolean needsCorrelation = headers.get(KafkaHeaders.CORRELATION_ID) == null && sourceIsMessage;
482-
boolean needsPartition = headers.get(KafkaHeaders.PARTITION_ID) == null && sourceIsMessage
482+
boolean needsPartition = headers.get(KafkaHeaders.PARTITION) == null && sourceIsMessage
483483
&& getReplyPartition((Message<?>) source) != null;
484484
if (needsTopic || needsCorrelation || needsPartition) {
485485
MessageBuilder<?> builder = MessageBuilder.fromMessage(reply);
@@ -546,7 +546,7 @@ private void sendReplyForMessageSource(Object result, String topic, Object sourc
546546
private void setPartition(MessageBuilder<?> builder, Message<?> source) {
547547
byte[] partitionBytes = getReplyPartition(source);
548548
if (partitionBytes != null) {
549-
builder.setHeader(KafkaHeaders.PARTITION_ID, ByteBuffer.wrap(partitionBytes).getInt());
549+
builder.setHeader(KafkaHeaders.PARTITION, ByteBuffer.wrap(partitionBytes).getInt());
550550
}
551551
}
552552

spring-kafka/src/main/java/org/springframework/kafka/streams/messaging/MessagingTransformer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2021 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -92,7 +92,7 @@ public KeyValue<K, R> transform(K key, V value) {
9292
headers.add(header);
9393
}
9494
});
95-
Object key2 = message.getHeaders().get(KafkaHeaders.MESSAGE_KEY);
95+
Object key2 = message.getHeaders().get(KafkaHeaders.KEY);
9696
return new KeyValue(key2 == null ? key : key2, message.getPayload());
9797
}
9898

spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,12 @@ public AbstractKafkaHeaderMapper(String... patterns) {
6868
this.matchers.add(new NeverMatchHeaderMatcher(
6969
KafkaHeaders.ACKNOWLEDGMENT,
7070
KafkaHeaders.CONSUMER,
71-
KafkaHeaders.MESSAGE_KEY,
71+
KafkaHeaders.KEY,
7272
KafkaHeaders.OFFSET,
73-
KafkaHeaders.PARTITION_ID,
73+
KafkaHeaders.PARTITION,
7474
KafkaHeaders.RAW_DATA,
75-
KafkaHeaders.RECEIVED_MESSAGE_KEY,
76-
KafkaHeaders.RECEIVED_PARTITION_ID,
75+
KafkaHeaders.RECEIVED_KEY,
76+
KafkaHeaders.RECEIVED_PARTITION,
7777
KafkaHeaders.RECEIVED_TIMESTAMP,
7878
KafkaHeaders.RECEIVED_TOPIC,
7979
KafkaHeaders.TIMESTAMP,

spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,30 @@ public abstract class KafkaHeaders {
4141
*/
4242
public static final String TOPIC = PREFIX + "topic";
4343

44+
/**
45+
* The header containing the record key when sending data to Kafka.
46+
* @since 2.9
47+
*/
48+
public static final String KEY = PREFIX + "messageKey";
49+
4450
/**
4551
* The header containing the message key when sending data to Kafka.
52+
* @deprecated in favor of {@link #KEY}.
4653
*/
54+
@Deprecated
4755
public static final String MESSAGE_KEY = PREFIX + "messageKey";
4856

4957
/**
5058
* The header containing the topic partition when sending data to Kafka.
59+
* @since 2.0
60+
*/
61+
public static final String PARTITION = PREFIX + "partitionId";
62+
63+
/**
64+
* The header containing the topic partition when sending data to Kafka.
65+
* @deprecated in favor of {@link #PARTITION}.
5166
*/
67+
@Deprecated
5268
public static final String PARTITION_ID = PREFIX + "partitionId";
5369

5470
/**
@@ -83,17 +99,34 @@ public abstract class KafkaHeaders {
8399
public static final String RECEIVED_TOPIC = RECEIVED + "Topic";
84100

85101
/**
86-
* The header containing the message key for the received message.
102+
* The header containing the record key from the received message.
103+
* @since 2.9
87104
*/
105+
public static final String RECEIVED_KEY = RECEIVED + "MessageKey";
106+
107+
/**
108+
* The header containing the record key from the received message.
109+
* @deprecated in favor of {@link #RECEIVED_KEY}.
110+
*/
111+
@Deprecated
88112
public static final String RECEIVED_MESSAGE_KEY = RECEIVED + "MessageKey";
89113

90114
/**
91-
* The header containing the topic partition for the received message.
115+
* The header containing the topic partition from the received message.
116+
* @since 2.9
117+
*/
118+
public static final String RECEIVED_PARTITION = RECEIVED + "PartitionId";
119+
120+
/**
121+
* The header containing the topic partition from the received message.
122+
* @deprecated in favor of {@link #RECEIVED_PARTITION}.
92123
*/
124+
@Deprecated
93125
public static final String RECEIVED_PARTITION_ID = RECEIVED + "PartitionId";
94126

95127
/**
96-
* The header for holding the {@link org.apache.kafka.common.record.TimestampType type} of timestamp.
128+
* The header for holding the {@link org.apache.kafka.common.record.TimestampType
129+
* type} of timestamp.
97130
*/
98131
public static final String TIMESTAMP_TYPE = PREFIX + "timestampType";
99132

spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -61,12 +61,12 @@ default void commonHeaders(Acknowledgment acknowledgment, Consumer<?, ?> consume
6161
@Nullable Object timestampType, Object timestamp) {
6262

6363
rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, topic);
64-
rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION_ID, partition);
64+
rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION, partition);
6565
rawHeaders.put(KafkaHeaders.OFFSET, offset);
6666
rawHeaders.put(KafkaHeaders.TIMESTAMP_TYPE, timestampType);
6767
rawHeaders.put(KafkaHeaders.RECEIVED_TIMESTAMP, timestamp);
6868
JavaUtils.INSTANCE
69-
.acceptIfNotNull(KafkaHeaders.RECEIVED_MESSAGE_KEY, theKey, (key, val) -> rawHeaders.put(key, val))
69+
.acceptIfNotNull(KafkaHeaders.RECEIVED_KEY, theKey, (key, val) -> rawHeaders.put(key, val))
7070
.acceptIfNotNull(KafkaHeaders.GROUP_ID, MessageConverter.getGroupId(),
7171
(key, val) -> rawHeaders.put(key, val))
7272
.acceptIfNotNull(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment, (key, val) -> rawHeaders.put(key, val))

spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -227,8 +227,8 @@ else if (topicHeader == null) {
227227
throw new IllegalStateException(KafkaHeaders.TOPIC + " must be a String or byte[], not "
228228
+ topicHeader.getClass());
229229
}
230-
Integer partition = headers.get(KafkaHeaders.PARTITION_ID, Integer.class);
231-
Object key = headers.get(KafkaHeaders.MESSAGE_KEY);
230+
Integer partition = headers.get(KafkaHeaders.PARTITION, Integer.class);
231+
Object key = headers.get(KafkaHeaders.KEY);
232232
Object payload = convertPayload(message);
233233
Long timestamp = headers.get(KafkaHeaders.TIMESTAMP, Long.class);
234234
Headers recordHeaders = initialRecordHeaders(message);

spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -266,7 +266,7 @@ public KafkaListenerContainerFactory<?> getContainerFactory() {
266266
containerFactory = "#{__listener.containerFactory}")
267267
// @SendTo("foo") test WARN log for void return
268268
public void listen1(List<Foo> foos, @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
269-
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions) {
269+
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions) {
270270
if (this.received == null) {
271271
this.received = foos;
272272
}
@@ -318,7 +318,7 @@ public Collection<Message<?>> listen1(List<Foo> foos) {
318318
}
319319
return foos.stream().map(f -> MessageBuilder.withPayload(new Foo(f.getBar().toUpperCase()))
320320
.setHeader(KafkaHeaders.TOPIC, "blc5")
321-
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
321+
.setHeader(KafkaHeaders.KEY, 42)
322322
.build())
323323
.collect(Collectors.toList());
324324
}

0 commit comments

Comments
 (0)