Skip to content

Commit e9ad934

Browse files
committed
spring-projectsGH-2423: Upgrade to Kafka 3.3.1, Streams Proc. API
Resolves spring-projects#2423 Upgrade Apache Kafka version to 3.3.1. Migrate transformers to the new processor API.
1 parent f408490 commit e9ad934

File tree

10 files changed

+241
-25
lines changed

10 files changed

+241
-25
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ ext {
6464
jaywayJsonPathVersion = '2.6.0'
6565
junit4Version = '4.13.2'
6666
junitJupiterVersion = '5.9.0'
67-
kafkaVersion = '3.2.3'
67+
kafkaVersion = '3.3.1'
6868
log4jVersion = '2.18.0'
6969
micrometerVersion = '1.10.0-M6'
7070
micrometerTracingVersion = '1.0.0-M8'

spring-kafka/src/main/java/org/springframework/kafka/aot/KafkaRuntimeHints.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727
import org.apache.kafka.clients.consumer.StickyAssignor;
2828
import org.apache.kafka.clients.producer.Producer;
2929
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
30-
import org.apache.kafka.clients.producer.UniformStickyPartitioner;
31-
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
3230
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
3331
import org.apache.kafka.common.protocol.Message;
3432
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -97,6 +95,7 @@
9795
*/
9896
public class KafkaRuntimeHints implements RuntimeHintsRegistrar {
9997

98+
@SuppressWarnings("deprecation")
10099
@Override
101100
public void registerHints(RuntimeHints hints, @Nullable ClassLoader classLoader) {
102101
ReflectionHints reflectionHints = hints.reflection();
@@ -147,9 +146,9 @@ public void registerHints(RuntimeHints hints, @Nullable ClassLoader classLoader)
147146
RoundRobinAssignor.class,
148147
StickyAssignor.class,
149148
// standard partitioners
150-
DefaultPartitioner.class,
149+
org.apache.kafka.clients.producer.internals.DefaultPartitioner.class,
151150
RoundRobinPartitioner.class,
152-
UniformStickyPartitioner.class,
151+
org.apache.kafka.clients.producer.UniformStickyPartitioner.class,
153152
// standard serialization
154153
ByteArrayDeserializer.class,
155154
ByteArraySerializer.class,

spring-kafka/src/main/java/org/springframework/kafka/streams/HeaderEnricher.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@
3636
*
3737
* @author Gary Russell
3838
* @since 2.3
39+
* @deprecated in favor of {@link HeaderEnricherProcessor}.
3940
*
4041
*/
42+
@Deprecated
4143
public class HeaderEnricher<K, V> implements Transformer<K, V, KeyValue<K, V>> {
4244

4345
private final Map<String, Expression> headerExpressions = new HashMap<>();
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright 2019-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.streams;
18+
19+
import java.nio.charset.StandardCharsets;
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
23+
import org.apache.kafka.common.header.Headers;
24+
import org.apache.kafka.common.header.internals.RecordHeader;
25+
import org.apache.kafka.streams.processor.api.ContextualProcessor;
26+
import org.apache.kafka.streams.processor.api.ProcessorContext;
27+
import org.apache.kafka.streams.processor.api.Record;
28+
29+
import org.springframework.expression.Expression;
30+
31+
/**
32+
* Manipulate the headers.
33+
*
34+
* @param <K> the input key type.
35+
* @param <V> the input value type.
36+
*
37+
* @author Gary Russell
38+
* @since 3.0
39+
*
40+
*/
41+
public class HeaderEnricherProcessor<K, V> extends ContextualProcessor<K, V, K, V> {
42+
43+
private final Map<String, Expression> headerExpressions = new HashMap<>();
44+
45+
/**
46+
* Construct an instance with the provided header expressions.
47+
* @param headerExpressions the header expressions; name:expression.
48+
*/
49+
public HeaderEnricherProcessor(Map<String, Expression> headerExpressions) {
50+
this.headerExpressions.putAll(headerExpressions);
51+
}
52+
53+
@Override
54+
public void process(Record<K, V> record) {
55+
Headers headers = record.headers();
56+
Container<K, V> container = new Container<>(context(), record.key(), record.value(), record);
57+
this.headerExpressions.forEach((name, expression) -> {
58+
Object headerValue = expression.getValue(container);
59+
if (headerValue instanceof String) {
60+
headerValue = ((String) headerValue).getBytes(StandardCharsets.UTF_8);
61+
}
62+
else if (!(headerValue instanceof byte[])) {
63+
throw new IllegalStateException("Invalid header value type: " + headerValue.getClass());
64+
}
65+
headers.add(new RecordHeader(name, (byte[]) headerValue));
66+
});
67+
context().forward(record);
68+
}
69+
70+
@Override
71+
public void close() {
72+
// NO-OP
73+
}
74+
75+
/**
76+
* Container object for SpEL evaluation.
77+
*
78+
* @param <K> the key type.
79+
* @param <V> the value type.
80+
*
81+
*/
82+
public static final class Container<K, V> {
83+
84+
private final ProcessorContext<K, V> context;
85+
86+
private final K key;
87+
88+
private final V value;
89+
90+
private final Record record;
91+
92+
Container(ProcessorContext<K, V> context, K key, V value, Record record) {
93+
this.context = context;
94+
this.key = key;
95+
this.value = value;
96+
this.record = record;
97+
}
98+
99+
public ProcessorContext getContext() {
100+
return this.context;
101+
}
102+
103+
public K getKey() {
104+
return this.key;
105+
}
106+
107+
public V getValue() {
108+
return this.value;
109+
}
110+
111+
public Record getRecord() {
112+
return this.record;
113+
}
114+
115+
}
116+
117+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright 2019-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.streams.messaging;
18+
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.Optional;
22+
23+
import org.apache.kafka.clients.consumer.ConsumerRecord;
24+
import org.apache.kafka.clients.producer.ProducerRecord;
25+
import org.apache.kafka.common.header.Headers;
26+
import org.apache.kafka.common.record.TimestampType;
27+
import org.apache.kafka.streams.kstream.Transformer;
28+
import org.apache.kafka.streams.processor.api.ContextualProcessor;
29+
import org.apache.kafka.streams.processor.api.ProcessorContext;
30+
import org.apache.kafka.streams.processor.api.Record;
31+
import org.apache.kafka.streams.processor.api.RecordMetadata;
32+
33+
import org.springframework.kafka.support.KafkaHeaders;
34+
import org.springframework.kafka.support.converter.MessagingMessageConverter;
35+
import org.springframework.messaging.Message;
36+
import org.springframework.util.Assert;
37+
38+
/**
39+
* A {@link Transformer} implementation that invokes a {@link MessagingFunction}
40+
* converting to/from spring-messaging {@link Message}. Can be used, for example,
41+
* to invoke a Spring Integration flow.
42+
*
43+
* @param <Kin> the input key type.
44+
* @param <Vin> the input value type.
45+
* @param <Kout> the output key type.
46+
* @param <Vout> the output value type.
47+
*
48+
* @author Gary Russell
49+
* @since 2.3
50+
*
51+
*/
52+
public class MessagingProcessor<Kin, Vin, Kout, Vout> extends ContextualProcessor<Kin, Vin, Kout, Vout> {
53+
54+
private final MessagingFunction function;
55+
56+
private final MessagingMessageConverter converter;
57+
58+
/**
59+
* Construct an instance with the provided function and converter.
60+
* @param function the function.
61+
* @param converter the converter.
62+
*/
63+
public MessagingProcessor(MessagingFunction function, MessagingMessageConverter converter) {
64+
Assert.notNull(function, "'function' cannot be null");
65+
Assert.notNull(converter, "'converter' cannot be null");
66+
this.function = function;
67+
this.converter = converter;
68+
}
69+
70+
@SuppressWarnings("unchecked")
71+
@Override
72+
public void process(Record<Kin, Vin> record) {
73+
ProcessorContext<Kout, Vout> context = context();
74+
RecordMetadata meta = context.recordMetadata().orElse(null);
75+
Assert.state(meta != null, "No record metadata present");
76+
Headers headers = record.headers();
77+
ConsumerRecord<Object, Object> rebuilt = new ConsumerRecord<Object, Object>(meta.topic(),
78+
meta.partition(), meta.offset(),
79+
record.timestamp(), TimestampType.NO_TIMESTAMP_TYPE,
80+
0, 0,
81+
record.key(), record.value(),
82+
headers, Optional.empty());
83+
Message<?> message = this.converter.toMessage(rebuilt, null, null, null);
84+
message = this.function.exchange(message);
85+
List<String> headerList = new ArrayList<>();
86+
headers.forEach(header -> headerList.add(header.key()));
87+
headerList.forEach(name -> headers.remove(name));
88+
ProducerRecord<?, ?> fromMessage = this.converter.fromMessage(message, "dummy");
89+
fromMessage.headers().forEach(header -> {
90+
if (!header.key().equals(KafkaHeaders.TOPIC)) {
91+
headers.add(header);
92+
}
93+
});
94+
context.forward(new Record<>((Kout) message.getHeaders().get(KafkaHeaders.KEY), (Vout) message.getPayload(),
95+
record.timestamp(), headers));
96+
}
97+
98+
@Override
99+
public void close() {
100+
// NO-OP
101+
}
102+
103+
}

spring-kafka/src/main/java/org/springframework/kafka/streams/messaging/MessagingTransformer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,10 @@
4444
*
4545
* @author Gary Russell
4646
* @since 2.3
47+
* @deprecated in favor of {@link MessagingProcessor}.
4748
*
4849
*/
50+
@Deprecated
4951
public class MessagingTransformer<K, V, R> implements Transformer<K, V, KeyValue<K, R>> {
5052

5153
private final MessagingFunction function;

spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,14 +27,14 @@
2727
import org.apache.kafka.clients.consumer.ConsumerRecord;
2828
import org.apache.kafka.common.serialization.Serdes;
2929
import org.apache.kafka.streams.KafkaStreams;
30-
import org.apache.kafka.streams.KeyValue;
3130
import org.apache.kafka.streams.StreamsBuilder;
3231
import org.apache.kafka.streams.StreamsConfig;
3332
import org.apache.kafka.streams.Topology;
3433
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
3534
import org.apache.kafka.streams.kstream.KStream;
36-
import org.apache.kafka.streams.kstream.Transformer;
3735
import org.apache.kafka.streams.processor.ProcessorContext;
36+
import org.apache.kafka.streams.processor.api.ContextualProcessor;
37+
import org.apache.kafka.streams.processor.api.Record;
3838
import org.apache.kafka.streams.state.StoreBuilder;
3939
import org.apache.kafka.streams.state.Stores;
4040
import org.junit.jupiter.api.Test;
@@ -174,19 +174,12 @@ public KStream<String, String> testStream(StreamsBuilder kStreamBuilder) {
174174
KStream<String, String> stream = kStreamBuilder.stream("test_topic");
175175

176176
stream
177-
.transform(() -> new Transformer<String, String, KeyValue<String, String>>() {
178-
@Override
179-
public void init(ProcessorContext context) {
180-
}
177+
.process(() -> new ContextualProcessor<String, String, String, String>() {
181178

182179
@Override
183-
public KeyValue<String, String> transform(String key, String value) {
184-
return null;
180+
public void process(Record<String, String> record) {
185181
}
186182

187-
@Override
188-
public void close() {
189-
}
190183
}, "testStateStore")
191184
.to("test_output");
192185

spring-kafka/src/test/java/org/springframework/kafka/streams/HeaderEnricherTests.java renamed to spring-kafka/src/test/java/org/springframework/kafka/streams/HeaderEnricherProcessorTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
* @since 2.3
4444
*
4545
*/
46-
public class HeaderEnricherTests {
46+
public class HeaderEnricherProcessorTests {
4747

4848
private static final String INPUT = "input";
4949

@@ -56,10 +56,9 @@ void testWithDriver() {
5656
headers.put("foo", new LiteralExpression("bar"));
5757
SpelExpressionParser parser = new SpelExpressionParser();
5858
headers.put("spel", parser.parseExpression("context.timestamp() + new String(key) + new String(value)"));
59-
HeaderEnricher<String, String> enricher = new HeaderEnricher<>(headers);
6059
KStream<String, String> stream = builder.stream(INPUT);
6160
stream
62-
.transform(() -> enricher)
61+
.process(() -> new HeaderEnricherProcessor<>(headers))
6362
.to(OUTPUT);
6463

6564
Properties config = new Properties();

spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,6 @@ public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
222222
headers.put("foo", new LiteralExpression("bar"));
223223
SpelExpressionParser parser = new SpelExpressionParser();
224224
headers.put("spel", parser.parseExpression("context.timestamp() + key + value"));
225-
HeaderEnricher<Integer, String> enricher = new HeaderEnricher<>(headers);
226225
stream.mapValues((ValueMapper<String, String>) String::toUpperCase)
227226
.mapValues(Foo::new)
228227
.repartition(Repartitioned.with(Serdes.Integer(), new JsonSerde<Foo>() { }))
@@ -233,7 +232,7 @@ public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
233232
.toStream()
234233
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
235234
.filter((i, s) -> s.length() > 40)
236-
.transform(() -> enricher)
235+
.process(() -> new HeaderEnricherProcessor<>(headers))
237236
.to(streamingTopic2);
238237

239238
stream.print(Printed.toSysOut());

0 commit comments

Comments
 (0)