Skip to content

GH-2260: Reduce Header Constant Verbosity #2261

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ Then, to use the template, you can invoke one of its methods.
When you use the methods with a `Message<?>` parameter, the topic, partition, and key information is provided in a message header that includes the following items:

* `KafkaHeaders.TOPIC`
* `KafkaHeaders.PARTITION_ID`
* `KafkaHeaders.MESSAGE_KEY`
* `KafkaHeaders.PARTITION`
* `KafkaHeaders.KEY`
* `KafkaHeaders.TIMESTAMP`

The message payload is the data.
Expand Down Expand Up @@ -831,7 +831,7 @@ In this example, we use the reply topic header from the request:
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
.setHeader(KafkaHeaders.KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.build();
}
Expand All @@ -850,7 +850,7 @@ It will also echo the incoming `KafkaHeaders.CORRELATION_ID` and `KafkaHeaders.R
@SendTo // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
.setHeader(KafkaHeaders.KEY, 42)
.build();
}
----
Expand Down Expand Up @@ -1474,13 +1474,13 @@ Finally, metadata about the record is available from message headers.
You can use the following header names to retrieve the headers of the message:

* `KafkaHeaders.OFFSET`
* `KafkaHeaders.RECEIVED_MESSAGE_KEY`
* `KafkaHeaders.RECEIVED_KEY`
* `KafkaHeaders.RECEIVED_TOPIC`
* `KafkaHeaders.RECEIVED_PARTITION_ID`
* `KafkaHeaders.RECEIVED_PARTITION`
* `KafkaHeaders.RECEIVED_TIMESTAMP`
* `KafkaHeaders.TIMESTAMP_TYPE`

Starting with version 2.5 the `RECEIVED_MESSAGE_KEY` is not present if the incoming record has a `null` key; previously the header was populated with a `null` value.
Starting with version 2.5 the `RECEIVED_KEY` is not present if the incoming record has a `null` key; previously the header was populated with a `null` value.
This change is to make the framework consistent with `spring-messaging` conventions where `null` valued headers are not present.

The following example shows how to use the headers:
Expand All @@ -1490,8 +1490,8 @@ The following example shows how to use the headers:
----
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
Expand Down Expand Up @@ -1558,8 +1558,8 @@ The following example shows how to use the headers:
----
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
Expand Down Expand Up @@ -2160,7 +2160,7 @@ public class MultiListenerSendTo {
@KafkaHandler
@SendTo("!{'annotated25reply2'}")
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
@Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
Expand Down Expand Up @@ -2258,7 +2258,7 @@ public Message<?> listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] rep
@Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
.setHeader(KafkaHeaders.KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.setHeader("someOtherHeader", "someValue")
.build();
Expand Down Expand Up @@ -4860,7 +4860,7 @@ The following example shows such a configuration:
[source, java]
----
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
----
Expand All @@ -4887,7 +4887,7 @@ static class MultiListenerBean {
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
Expand Down Expand Up @@ -4973,7 +4973,7 @@ public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
MessageHeaders headers = m.getHeaders();
c.seek(new org.apache.kafka.common.TopicPartition(
headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
headers.get(KafkaHeaders.RECEIVED_PARTITION, Integer.class)),
headers.get(KafkaHeaders.OFFSET, Long.class));
return null;
};
Expand All @@ -4992,7 +4992,7 @@ public ConsumerAwareListenerErrorHandler listen10ErrorHandler() {
this.listen10Exception = e;
MessageHeaders headers = m.getHeaders();
List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION, List.class);
List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
for (int i = 0; i < topics.size(); i++) {
Expand Down Expand Up @@ -5549,8 +5549,8 @@ For POJO batch listeners, starting with version 2.8.6, the header is copied into
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
info = "info for batch")
public void listen(List<Thing> list,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.LISTENER_INFO) String info) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ public interface KafkaOperations<K, V> {
* @param message the message to send.
* @return a Future for the {@link SendResult}.
* @see org.springframework.kafka.support.KafkaHeaders#TOPIC
* @see org.springframework.kafka.support.KafkaHeaders#PARTITION_ID
* @see org.springframework.kafka.support.KafkaHeaders#MESSAGE_KEY
* @see org.springframework.kafka.support.KafkaHeaders#PARTITION
* @see org.springframework.kafka.support.KafkaHeaders#KEY
*/
ListenableFuture<SendResult<K, V>> send(Message<?> message);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -479,7 +479,7 @@ private Message<?> checkHeaders(Object result, String topic, Object source) { //
boolean needsTopic = headers.get(KafkaHeaders.TOPIC) == null;
boolean sourceIsMessage = source instanceof Message;
boolean needsCorrelation = headers.get(KafkaHeaders.CORRELATION_ID) == null && sourceIsMessage;
boolean needsPartition = headers.get(KafkaHeaders.PARTITION_ID) == null && sourceIsMessage
boolean needsPartition = headers.get(KafkaHeaders.PARTITION) == null && sourceIsMessage
&& getReplyPartition((Message<?>) source) != null;
if (needsTopic || needsCorrelation || needsPartition) {
MessageBuilder<?> builder = MessageBuilder.fromMessage(reply);
Expand Down Expand Up @@ -546,7 +546,7 @@ private void sendReplyForMessageSource(Object result, String topic, Object sourc
private void setPartition(MessageBuilder<?> builder, Message<?> source) {
byte[] partitionBytes = getReplyPartition(source);
if (partitionBytes != null) {
builder.setHeader(KafkaHeaders.PARTITION_ID, ByteBuffer.wrap(partitionBytes).getInt());
builder.setHeader(KafkaHeaders.PARTITION, ByteBuffer.wrap(partitionBytes).getInt());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 the original author or authors.
* Copyright 2019-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -92,7 +92,7 @@ public KeyValue<K, R> transform(K key, V value) {
headers.add(header);
}
});
Object key2 = message.getHeaders().get(KafkaHeaders.MESSAGE_KEY);
Object key2 = message.getHeaders().get(KafkaHeaders.KEY);
return new KeyValue(key2 == null ? key : key2, message.getPayload());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ public AbstractKafkaHeaderMapper(String... patterns) {
this.matchers.add(new NeverMatchHeaderMatcher(
KafkaHeaders.ACKNOWLEDGMENT,
KafkaHeaders.CONSUMER,
KafkaHeaders.MESSAGE_KEY,
KafkaHeaders.KEY,
KafkaHeaders.OFFSET,
KafkaHeaders.PARTITION_ID,
KafkaHeaders.PARTITION,
KafkaHeaders.RAW_DATA,
KafkaHeaders.RECEIVED_MESSAGE_KEY,
KafkaHeaders.RECEIVED_PARTITION_ID,
KafkaHeaders.RECEIVED_KEY,
KafkaHeaders.RECEIVED_PARTITION,
KafkaHeaders.RECEIVED_TIMESTAMP,
KafkaHeaders.RECEIVED_TOPIC,
KafkaHeaders.TIMESTAMP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,30 @@ public abstract class KafkaHeaders {
*/
public static final String TOPIC = PREFIX + "topic";

/**
* The header containing the record key when sending data to Kafka.
* @since 2.9
*/
public static final String KEY = PREFIX + "messageKey";

/**
* The header containing the message key when sending data to Kafka.
* @deprecated in favor of {@link #KEY}.
*/
@Deprecated
public static final String MESSAGE_KEY = PREFIX + "messageKey";

/**
* The header containing the topic partition when sending data to Kafka.
* @since 2.0
*/
public static final String PARTITION = PREFIX + "partitionId";

/**
* The header containing the topic partition when sending data to Kafka.
* @deprecated in favor of {@link #PARTITION}.
*/
@Deprecated
public static final String PARTITION_ID = PREFIX + "partitionId";

/**
Expand Down Expand Up @@ -83,17 +99,34 @@ public abstract class KafkaHeaders {
public static final String RECEIVED_TOPIC = RECEIVED + "Topic";

/**
* The header containing the message key for the received message.
* The header containing the record key from the received message.
* @since 2.9
*/
public static final String RECEIVED_KEY = RECEIVED + "MessageKey";

/**
* The header containing the record key from the received message.
* @deprecated in favor of {@link #RECEIVED_KEY}.
*/
@Deprecated
public static final String RECEIVED_MESSAGE_KEY = RECEIVED + "MessageKey";

/**
* The header containing the topic partition for the received message.
* The header containing the topic partition from the received message.
* @since 2.9
*/
public static final String RECEIVED_PARTITION = RECEIVED + "PartitionId";

/**
* The header containing the topic partition from the received message.
* @deprecated in favor of {@link #RECEIVED_PARTITION}.
*/
@Deprecated
public static final String RECEIVED_PARTITION_ID = RECEIVED + "PartitionId";

/**
* The header for holding the {@link org.apache.kafka.common.record.TimestampType type} of timestamp.
* The header for holding the {@link org.apache.kafka.common.record.TimestampType
* type} of timestamp.
*/
public static final String TIMESTAMP_TYPE = PREFIX + "timestampType";

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,12 +61,12 @@ default void commonHeaders(Acknowledgment acknowledgment, Consumer<?, ?> consume
@Nullable Object timestampType, Object timestamp) {

rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, topic);
rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION_ID, partition);
rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION, partition);
rawHeaders.put(KafkaHeaders.OFFSET, offset);
rawHeaders.put(KafkaHeaders.TIMESTAMP_TYPE, timestampType);
rawHeaders.put(KafkaHeaders.RECEIVED_TIMESTAMP, timestamp);
JavaUtils.INSTANCE
.acceptIfNotNull(KafkaHeaders.RECEIVED_MESSAGE_KEY, theKey, (key, val) -> rawHeaders.put(key, val))
.acceptIfNotNull(KafkaHeaders.RECEIVED_KEY, theKey, (key, val) -> rawHeaders.put(key, val))
.acceptIfNotNull(KafkaHeaders.GROUP_ID, MessageConverter.getGroupId(),
(key, val) -> rawHeaders.put(key, val))
.acceptIfNotNull(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment, (key, val) -> rawHeaders.put(key, val))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -227,8 +227,8 @@ else if (topicHeader == null) {
throw new IllegalStateException(KafkaHeaders.TOPIC + " must be a String or byte[], not "
+ topicHeader.getClass());
}
Integer partition = headers.get(KafkaHeaders.PARTITION_ID, Integer.class);
Object key = headers.get(KafkaHeaders.MESSAGE_KEY);
Integer partition = headers.get(KafkaHeaders.PARTITION, Integer.class);
Object key = headers.get(KafkaHeaders.KEY);
Object payload = convertPayload(message);
Long timestamp = headers.get(KafkaHeaders.TIMESTAMP, Long.class);
Headers recordHeaders = initialRecordHeaders(message);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2021 the original author or authors.
* Copyright 2017-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -266,7 +266,7 @@ public KafkaListenerContainerFactory<?> getContainerFactory() {
containerFactory = "#{__listener.containerFactory}")
// @SendTo("foo") test WARN log for void return
public void listen1(List<Foo> foos, @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions) {
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions) {
if (this.received == null) {
this.received = foos;
}
Expand Down Expand Up @@ -318,7 +318,7 @@ public Collection<Message<?>> listen1(List<Foo> foos) {
}
return foos.stream().map(f -> MessageBuilder.withPayload(new Foo(f.getBar().toUpperCase()))
.setHeader(KafkaHeaders.TOPIC, "blc5")
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
.setHeader(KafkaHeaders.KEY, 42)
.build())
.collect(Collectors.toList());
}
Expand Down
Loading