Skip to content

Commit 08ffcbf

Browse files
committed
spring-projectsGH-2423: Upgrade to Kafka 3.3.1, Streams Proc. API
Resolves spring-projects#2423 Upgrade Apache Kafka version to 3.3.1. Migrate transformers to the new processor API.
1 parent f408490 commit 08ffcbf

File tree

11 files changed

+252
-35
lines changed

11 files changed

+252
-35
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ ext {
6464
jaywayJsonPathVersion = '2.6.0'
6565
junit4Version = '4.13.2'
6666
junitJupiterVersion = '5.9.0'
67-
kafkaVersion = '3.2.3'
67+
kafkaVersion = '3.3.1'
6868
log4jVersion = '2.18.0'
6969
micrometerVersion = '1.10.0-M6'
7070
micrometerTracingVersion = '1.0.0-M8'

spring-kafka-docs/src/main/asciidoc/streams.adoc

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ Starting with version 2.7, the default is to never clean up local state.
239239
[[streams-header-enricher]]
240240
==== Header Enricher
241241

242-
Version 2.3 added the `HeaderEnricher` implementation of `Transformer`.
242+
Version 3.0 added the `HeaderEnricherProcessor` extension of `ContextualProcessor`; providing the same functionality as the deprecated `HeaderEnricher` which implemented the deprecated `Transformer` interface.
243243
This can be used to add headers within the stream processing; the header values are SpEL expressions; the root object of the expression evaluation has 3 properties:
244244

245245
* `context` - the `ProcessorContext`, allowing access to the current record metadata
@@ -253,18 +253,18 @@ To use the enricher within a stream:
253253
====
254254
[source, java]
255255
----
256-
.transform(() -> enricher)
256+
.process(() -> new HeaderEnricherProcessor(expressions))
257257
----
258258
====
259259

260-
The transformer does not change the `key` or `value`; it simply adds headers.
260+
The processor does not change the `key` or `value`; it simply adds headers.
261261

262-
IMPORTANT: If your stream is multi-threaded, you need a new instance for each record.
262+
IMPORTANT: You need a new instance for each record.
263263

264264
====
265265
[source, java]
266266
----
267-
.transform(() -> new HeaderEnricher<..., ...>(expressionMap))
267+
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))
268268
----
269269
====
270270

@@ -276,19 +276,20 @@ Here is a simple example, adding one literal header and one variable:
276276
Map<String, Expression> headers = new HashMap<>();
277277
headers.put("header1", new LiteralExpression("value1"));
278278
SpelExpressionParser parser = new SpelExpressionParser();
279-
headers.put("header2", parser.parseExpression("context.timestamp() + ' @' + context.offset()"));
280-
HeaderEnricher<String, String> enricher = new HeaderEnricher<>(headers);
279+
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
280+
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String> enricher = new HeaderEnricher<>(headers);
281281
KStream<String, String> stream = builder.stream(INPUT);
282282
stream
283-
.transform(() -> enricher)
283+
.process(() -> supplier)
284284
.to(OUTPUT);
285285
----
286286
====
287287

288288
[[streams-messaging]]
289-
==== `MessagingTransformer`
289+
==== `MessagingProcessor`
290290

291-
Version 2.3 added the `MessagingTransformer` this allows a Kafka Streams topology to interact with a Spring Messaging component, such as a Spring Integration flow.
291+
Version 3.0 added the `MessagingProcessor` extension of `ContextualProcessor`; providing the same functionality as the deprecated `MessagingTransformer` which implemented the deprecated `Transformer` interface.
292+
This allows a Kafka Streams topology to interact with a Spring Messaging component, such as a Spring Integration flow.
292293
The transformer requires an implementation of `MessagingFunction`.
293294

294295
====

spring-kafka/src/main/java/org/springframework/kafka/aot/KafkaRuntimeHints.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727
import org.apache.kafka.clients.consumer.StickyAssignor;
2828
import org.apache.kafka.clients.producer.Producer;
2929
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
30-
import org.apache.kafka.clients.producer.UniformStickyPartitioner;
31-
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
3230
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
3331
import org.apache.kafka.common.protocol.Message;
3432
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -97,6 +95,7 @@
9795
*/
9896
public class KafkaRuntimeHints implements RuntimeHintsRegistrar {
9997

98+
@SuppressWarnings("deprecation")
10099
@Override
101100
public void registerHints(RuntimeHints hints, @Nullable ClassLoader classLoader) {
102101
ReflectionHints reflectionHints = hints.reflection();
@@ -147,9 +146,9 @@ public void registerHints(RuntimeHints hints, @Nullable ClassLoader classLoader)
147146
RoundRobinAssignor.class,
148147
StickyAssignor.class,
149148
// standard partitioners
150-
DefaultPartitioner.class,
149+
org.apache.kafka.clients.producer.internals.DefaultPartitioner.class,
151150
RoundRobinPartitioner.class,
152-
UniformStickyPartitioner.class,
151+
org.apache.kafka.clients.producer.UniformStickyPartitioner.class,
153152
// standard serialization
154153
ByteArrayDeserializer.class,
155154
ByteArraySerializer.class,

spring-kafka/src/main/java/org/springframework/kafka/streams/HeaderEnricher.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@
3636
*
3737
* @author Gary Russell
3838
* @since 2.3
39+
* @deprecated in favor of {@link HeaderEnricherProcessor}.
3940
*
4041
*/
42+
@Deprecated
4143
public class HeaderEnricher<K, V> implements Transformer<K, V, KeyValue<K, V>> {
4244

4345
private final Map<String, Expression> headerExpressions = new HashMap<>();
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright 2019-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.streams;
18+
19+
import java.nio.charset.StandardCharsets;
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
23+
import org.apache.kafka.common.header.Headers;
24+
import org.apache.kafka.common.header.internals.RecordHeader;
25+
import org.apache.kafka.streams.processor.api.ContextualProcessor;
26+
import org.apache.kafka.streams.processor.api.ProcessorContext;
27+
import org.apache.kafka.streams.processor.api.Record;
28+
29+
import org.springframework.expression.Expression;
30+
31+
/**
32+
* Manipulate the headers.
33+
*
34+
* @param <K> the input key type.
35+
* @param <V> the input value type.
36+
*
37+
* @author Gary Russell
38+
* @since 3.0
39+
*
40+
*/
41+
public class HeaderEnricherProcessor<K, V> extends ContextualProcessor<K, V, K, V> {
42+
43+
private final Map<String, Expression> headerExpressions = new HashMap<>();
44+
45+
/**
46+
* Construct an instance with the provided header expressions.
47+
* @param headerExpressions the header expressions; name:expression.
48+
*/
49+
public HeaderEnricherProcessor(Map<String, Expression> headerExpressions) {
50+
this.headerExpressions.putAll(headerExpressions);
51+
}
52+
53+
@Override
54+
public void process(Record<K, V> record) {
55+
Headers headers = record.headers();
56+
Container<K, V> container = new Container<>(context(), record.key(), record.value(), record);
57+
this.headerExpressions.forEach((name, expression) -> {
58+
Object headerValue = expression.getValue(container);
59+
if (headerValue instanceof String) {
60+
headerValue = ((String) headerValue).getBytes(StandardCharsets.UTF_8);
61+
}
62+
else if (!(headerValue instanceof byte[])) {
63+
throw new IllegalStateException("Invalid header value type: " + headerValue.getClass());
64+
}
65+
headers.add(new RecordHeader(name, (byte[]) headerValue));
66+
});
67+
context().forward(record);
68+
}
69+
70+
@Override
71+
public void close() {
72+
// NO-OP
73+
}
74+
75+
/**
76+
* Container object for SpEL evaluation.
77+
*
78+
* @param <K> the key type.
79+
* @param <V> the value type.
80+
*
81+
*/
82+
public static final class Container<K, V> {
83+
84+
private final ProcessorContext<K, V> context;
85+
86+
private final K key;
87+
88+
private final V value;
89+
90+
private final Record record;
91+
92+
Container(ProcessorContext<K, V> context, K key, V value, Record record) {
93+
this.context = context;
94+
this.key = key;
95+
this.value = value;
96+
this.record = record;
97+
}
98+
99+
public ProcessorContext getContext() {
100+
return this.context;
101+
}
102+
103+
public K getKey() {
104+
return this.key;
105+
}
106+
107+
public V getValue() {
108+
return this.value;
109+
}
110+
111+
public Record getRecord() {
112+
return this.record;
113+
}
114+
115+
}
116+
117+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright 2019-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.streams.messaging;
18+
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.Optional;
22+
23+
import org.apache.kafka.clients.consumer.ConsumerRecord;
24+
import org.apache.kafka.clients.producer.ProducerRecord;
25+
import org.apache.kafka.common.header.Headers;
26+
import org.apache.kafka.common.record.TimestampType;
27+
import org.apache.kafka.streams.kstream.Transformer;
28+
import org.apache.kafka.streams.processor.api.ContextualProcessor;
29+
import org.apache.kafka.streams.processor.api.ProcessorContext;
30+
import org.apache.kafka.streams.processor.api.Record;
31+
import org.apache.kafka.streams.processor.api.RecordMetadata;
32+
33+
import org.springframework.kafka.support.KafkaHeaders;
34+
import org.springframework.kafka.support.converter.MessagingMessageConverter;
35+
import org.springframework.messaging.Message;
36+
import org.springframework.util.Assert;
37+
38+
/**
39+
* A {@link Transformer} implementation that invokes a {@link MessagingFunction}
40+
* converting to/from spring-messaging {@link Message}. Can be used, for example,
41+
* to invoke a Spring Integration flow.
42+
*
43+
* @param <Kin> the input key type.
44+
* @param <Vin> the input value type.
45+
* @param <Kout> the output key type.
46+
* @param <Vout> the output value type.
47+
*
48+
* @author Gary Russell
49+
* @since 2.3
50+
*
51+
*/
52+
public class MessagingProcessor<Kin, Vin, Kout, Vout> extends ContextualProcessor<Kin, Vin, Kout, Vout> {
53+
54+
private final MessagingFunction function;
55+
56+
private final MessagingMessageConverter converter;
57+
58+
/**
59+
* Construct an instance with the provided function and converter.
60+
* @param function the function.
61+
* @param converter the converter.
62+
*/
63+
public MessagingProcessor(MessagingFunction function, MessagingMessageConverter converter) {
64+
Assert.notNull(function, "'function' cannot be null");
65+
Assert.notNull(converter, "'converter' cannot be null");
66+
this.function = function;
67+
this.converter = converter;
68+
}
69+
70+
@SuppressWarnings("unchecked")
71+
@Override
72+
public void process(Record<Kin, Vin> record) {
73+
ProcessorContext<Kout, Vout> context = context();
74+
RecordMetadata meta = context.recordMetadata().orElse(null);
75+
Assert.state(meta != null, "No record metadata present");
76+
Headers headers = record.headers();
77+
ConsumerRecord<Object, Object> rebuilt = new ConsumerRecord<Object, Object>(meta.topic(),
78+
meta.partition(), meta.offset(),
79+
record.timestamp(), TimestampType.NO_TIMESTAMP_TYPE,
80+
0, 0,
81+
record.key(), record.value(),
82+
headers, Optional.empty());
83+
Message<?> message = this.converter.toMessage(rebuilt, null, null, null);
84+
message = this.function.exchange(message);
85+
List<String> headerList = new ArrayList<>();
86+
headers.forEach(header -> headerList.add(header.key()));
87+
headerList.forEach(name -> headers.remove(name));
88+
ProducerRecord<?, ?> fromMessage = this.converter.fromMessage(message, "dummy");
89+
fromMessage.headers().forEach(header -> {
90+
if (!header.key().equals(KafkaHeaders.TOPIC)) {
91+
headers.add(header);
92+
}
93+
});
94+
context.forward(new Record<>((Kout) message.getHeaders().get(KafkaHeaders.KEY), (Vout) message.getPayload(),
95+
record.timestamp(), headers));
96+
}
97+
98+
@Override
99+
public void close() {
100+
// NO-OP
101+
}
102+
103+
}

spring-kafka/src/main/java/org/springframework/kafka/streams/messaging/MessagingTransformer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,10 @@
4444
*
4545
* @author Gary Russell
4646
* @since 2.3
47+
* @deprecated in favor of {@link MessagingProcessor}.
4748
*
4849
*/
50+
@Deprecated
4951
public class MessagingTransformer<K, V, R> implements Transformer<K, V, KeyValue<K, R>> {
5052

5153
private final MessagingFunction function;

spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,14 +27,14 @@
2727
import org.apache.kafka.clients.consumer.ConsumerRecord;
2828
import org.apache.kafka.common.serialization.Serdes;
2929
import org.apache.kafka.streams.KafkaStreams;
30-
import org.apache.kafka.streams.KeyValue;
3130
import org.apache.kafka.streams.StreamsBuilder;
3231
import org.apache.kafka.streams.StreamsConfig;
3332
import org.apache.kafka.streams.Topology;
3433
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
3534
import org.apache.kafka.streams.kstream.KStream;
36-
import org.apache.kafka.streams.kstream.Transformer;
3735
import org.apache.kafka.streams.processor.ProcessorContext;
36+
import org.apache.kafka.streams.processor.api.ContextualProcessor;
37+
import org.apache.kafka.streams.processor.api.Record;
3838
import org.apache.kafka.streams.state.StoreBuilder;
3939
import org.apache.kafka.streams.state.Stores;
4040
import org.junit.jupiter.api.Test;
@@ -174,19 +174,12 @@ public KStream<String, String> testStream(StreamsBuilder kStreamBuilder) {
174174
KStream<String, String> stream = kStreamBuilder.stream("test_topic");
175175

176176
stream
177-
.transform(() -> new Transformer<String, String, KeyValue<String, String>>() {
178-
@Override
179-
public void init(ProcessorContext context) {
180-
}
177+
.process(() -> new ContextualProcessor<String, String, String, String>() {
181178

182179
@Override
183-
public KeyValue<String, String> transform(String key, String value) {
184-
return null;
180+
public void process(Record<String, String> record) {
185181
}
186182

187-
@Override
188-
public void close() {
189-
}
190183
}, "testStateStore")
191184
.to("test_output");
192185

0 commit comments

Comments
 (0)