Skip to content

Commit b6d39e4

Browse files
authored
GH-3696: DeserializationEx support for KafkaMS (#8689)
* GH-3696: DeserializationEx support for KafkaMS Fixes #3696 * Add an internal logic into `KafkaMessageSource` to react properly for the `ErrorHandlingDeserializer` configuration and re-throw `DeserializationException` * * Use `ErrorHandlingUtils` from `spring-kafka` to determine a `deserializer` from consumer properties * Revise the test in favor of just single `await().untilAsserted()` - no reason to wait for assignment or check for returned record. We just need to be sure that `DeserializationException` is thrown eventually
1 parent 2efaecd commit b6d39e4

File tree

5 files changed

+120
-5
lines changed

5 files changed

+120
-5
lines changed

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ ext {
105105
springAmqpVersion = '3.0.6'
106106
springDataVersion = '2023.1.0-M1'
107107
springGraphqlVersion = '1.2.2'
108-
springKafkaVersion = '3.0.9'
108+
springKafkaVersion = '3.0.10-SNAPSHOT'
109109
springRetryVersion = '2.0.2'
110110
springSecurityVersion = '6.2.0-M1'
111111
springVersion = '6.1.0-M2'

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java

+44-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.LinkedHashSet;
2727
import java.util.List;
2828
import java.util.Map;
29+
import java.util.Properties;
2930
import java.util.Set;
3031
import java.util.TreeSet;
3132
import java.util.concurrent.ConcurrentHashMap;
@@ -46,6 +47,7 @@
4647
import org.apache.kafka.common.TopicPartition;
4748
import org.apache.kafka.common.errors.WakeupException;
4849

50+
import org.springframework.beans.factory.BeanClassLoaderAware;
4951
import org.springframework.core.log.LogAccessor;
5052
import org.springframework.integration.IntegrationMessageHeaderAccessor;
5153
import org.springframework.integration.acks.AcknowledgmentCallback;
@@ -58,6 +60,8 @@
5860
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
5961
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
6062
import org.springframework.kafka.listener.ConsumerProperties;
63+
import org.springframework.kafka.listener.ErrorHandlingUtils;
64+
import org.springframework.kafka.listener.ListenerUtils;
6165
import org.springframework.kafka.listener.LoggingCommitCallback;
6266
import org.springframework.kafka.support.Acknowledgment;
6367
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
@@ -69,6 +73,8 @@
6973
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
7074
import org.springframework.kafka.support.converter.MessagingMessageConverter;
7175
import org.springframework.kafka.support.converter.RecordMessageConverter;
76+
import org.springframework.kafka.support.serializer.DeserializationException;
77+
import org.springframework.kafka.support.serializer.SerializationUtils;
7278
import org.springframework.lang.Nullable;
7379
import org.springframework.messaging.Message;
7480
import org.springframework.util.Assert;
@@ -102,7 +108,8 @@
102108
* @since 5.4
103109
*
104110
*/
105-
public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object> implements Pausable {
111+
public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object>
112+
implements Pausable, BeanClassLoaderAware {
106113

107114
private static final long MIN_ASSIGN_TIMEOUT = 2000L;
108115

@@ -146,6 +153,10 @@ public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object> impl
146153

147154
private Duration closeTimeout = Duration.ofSeconds(DEFAULT_CLOSE_TIMEOUT);
148155

156+
private boolean checkNullKeyForExceptions;
157+
158+
private boolean checkNullValueForExceptions;
159+
149160
private volatile Consumer<K, V> consumer;
150161

151162
private volatile boolean pausing;
@@ -158,6 +169,8 @@ public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object> impl
158169

159170
public volatile boolean newAssignment; // NOSONAR - direct access from inner
160171

172+
private ClassLoader classLoader;
173+
161174
/**
162175
* Construct an instance with the supplied parameters. Fetching multiple
163176
* records per poll will be disabled.
@@ -257,11 +270,27 @@ public Collection<TopicPartition> getAssignedPartitions() {
257270
return Collections.unmodifiableCollection(this.assignedPartitions);
258271
}
259272

273+
@Override
274+
public void setBeanClassLoader(ClassLoader classLoader) {
275+
this.classLoader = classLoader;
276+
}
277+
260278
@Override
261279
protected void onInit() {
262280
if (!StringUtils.hasText(this.consumerProperties.getClientId())) {
263281
this.consumerProperties.setClientId(getComponentName());
264282
}
283+
284+
Map<String, Object> props = this.consumerFactory.getConfigurationProperties();
285+
Properties kafkaConsumerProperties = this.consumerProperties.getKafkaConsumerProperties();
286+
this.checkNullKeyForExceptions =
287+
this.consumerProperties.isCheckDeserExWhenKeyNull() ||
288+
ErrorHandlingUtils.checkDeserializer(this.consumerFactory, kafkaConsumerProperties, false,
289+
this.classLoader);
290+
this.checkNullValueForExceptions =
291+
this.consumerProperties.isCheckDeserExWhenValueNull() ||
292+
ErrorHandlingUtils.checkDeserializer(this.consumerFactory, kafkaConsumerProperties, true,
293+
this.classLoader);
265294
}
266295

267296
/**
@@ -609,6 +638,13 @@ record = this.recordsIterator.next();
609638
}
610639

611640
private Object recordToMessage(ConsumerRecord<K, V> record) {
641+
if (record.value() == null && this.checkNullValueForExceptions) {
642+
checkDeserializationException(record, SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
643+
}
644+
if (record.key() == null && this.checkNullKeyForExceptions) {
645+
checkDeserializationException(record, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
646+
}
647+
612648
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
613649
KafkaAckInfo<K, V> ackInfo = new KafkaAckInfoImpl(record, topicPartition);
614650
AcknowledgmentCallback ackCallback = this.ackCallbackFactory.createCallback(ackInfo);
@@ -639,6 +675,13 @@ private Object recordToMessage(ConsumerRecord<K, V> record) {
639675
}
640676
}
641677

678+
private void checkDeserializationException(ConsumerRecord<K, V> cRecord, String headerName) {
679+
DeserializationException exception = ListenerUtils.getExceptionFromHeader(cRecord, headerName, this.logger);
680+
if (exception != null) {
681+
throw exception;
682+
}
683+
}
684+
642685
@Override
643686
public void destroy() {
644687
this.lock.lock();

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceIntegrationTests.java

+62-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,8 @@
2424
import org.apache.kafka.clients.consumer.ConsumerConfig;
2525
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
2626
import org.apache.kafka.common.TopicPartition;
27+
import org.apache.kafka.common.serialization.Deserializer;
28+
import org.junit.jupiter.api.BeforeAll;
2729
import org.junit.jupiter.api.Test;
2830

2931
import org.springframework.integration.channel.NullChannel;
@@ -32,11 +34,17 @@
3234
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
3335
import org.springframework.kafka.core.KafkaTemplate;
3436
import org.springframework.kafka.listener.ConsumerProperties;
37+
import org.springframework.kafka.support.serializer.DeserializationException;
38+
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
3539
import org.springframework.kafka.test.utils.KafkaTestUtils;
3640
import org.springframework.messaging.Message;
3741
import org.springframework.messaging.support.GenericMessage;
42+
import org.springframework.util.ClassUtils;
3843

3944
import static org.assertj.core.api.Assertions.assertThat;
45+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
46+
import static org.awaitility.Awaitility.await;
47+
import static org.mockito.Mockito.mock;
4048

4149
/**
4250
* @author Gary Russell
@@ -50,9 +58,17 @@ class MessageSourceIntegrationTests {
5058

5159
static final String TOPIC1 = "MessageSourceIntegrationTests1";
5260

61+
static final String TOPIC2 = "MessageSourceIntegrationTests2";
62+
63+
static String brokers;
64+
65+
@BeforeAll
66+
static void setup() {
67+
brokers = System.getProperty("spring.global.embedded.kafka.brokers");
68+
}
69+
5370
@Test
5471
void testSource() throws Exception {
55-
String brokers = System.getProperty("spring.global.embedded.kafka.brokers");
5672
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(brokers, "testSource", "false");
5773
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2);
5874
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -122,4 +138,48 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
122138
template.destroy();
123139
}
124140

141+
@Test
142+
void deserializationErrorIsThrownFromSource() throws Exception {
143+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(brokers, "testErrorChannelSource", "false");
144+
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
145+
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, FailingDeserializer.class);
146+
147+
DefaultKafkaConsumerFactory<Integer, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
148+
ConsumerProperties consumerProperties = new ConsumerProperties(TOPIC2);
149+
150+
consumerProperties.setPollTimeout(10);
151+
152+
KafkaMessageSource<Integer, String> source = new KafkaMessageSource<>(consumerFactory, consumerProperties);
153+
source.setBeanClassLoader(ClassUtils.getDefaultClassLoader());
154+
source.setBeanFactory(mock());
155+
source.afterPropertiesSet();
156+
source.start();
157+
158+
Map<String, Object> producerProps = KafkaTestUtils.producerProps(brokers);
159+
DefaultKafkaProducerFactory<Object, Object> producerFactory = new DefaultKafkaProducerFactory<>(producerProps);
160+
KafkaTemplate<Object, Object> template = new KafkaTemplate<>(producerFactory);
161+
162+
String testData = "test data";
163+
template.send(TOPIC2, testData);
164+
165+
await().untilAsserted(() ->
166+
assertThatExceptionOfType(DeserializationException.class)
167+
.isThrownBy(source::receive)
168+
.hasFieldOrPropertyWithValue("data", testData.getBytes())
169+
.withMessage("failed to deserialize")
170+
.withStackTraceContaining("failed deserialization"));
171+
172+
source.destroy();
173+
template.destroy();
174+
}
175+
176+
public static class FailingDeserializer implements Deserializer<String> {
177+
178+
@Override
179+
public String deserialize(String topic, byte[] data) {
180+
throw new RuntimeException("failed deserialization");
181+
}
182+
183+
}
184+
125185
}

src/reference/asciidoc/kafka.adoc

+6-1
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,11 @@ If you set `allowMultiFetch` to `true` you must process all the retrieved record
436436

437437
Messages emitted by this adapter contain a header `kafka_remainingRecords` with a count of records remaining from the previous poll.
438438

439+
Starting with version `6.2`, the `KafkaMessageSource` supports an `ErrorHandlingDeserializer` provided in the consumer properties.
440+
A `DeserializationException` is extracted from record headers and thrown to the called.
441+
With a `SourcePollingChannelAdapter` this exception is wrapped into an `ErrorMessage` and published to its `errorChannel`.
442+
See https://docs.spring.io/spring-kafka/reference/html/#error-handling-deserializer[`ErrorHandlingDeserializer`] documentation for more information.
443+
439444
[[kafka-outbound-gateway]]
440445
=== Outbound Gateway
441446

@@ -448,7 +453,7 @@ It is suggested that you add a `ConsumerRebalanceListener` to the template's rep
448453

449454
The `KafkaProducerMessageHandler` `sendTimeoutExpression` default is `delivery.timeout.ms` Kafka producer property `+ 5000` so that the actual Kafka error after a timeout is propagated to the application, instead of a timeout generated by this framework.
450455
This has been changed for consistency because you may get unexpected behavior (Spring may time out the `send()`, while it is actually, eventually, successful).
451-
IMPORTANT: That timeout is 120 seconds by default so you may wish to reduce it to get more timely failures.
456+
IMPORTANT: That timeout is 120 seconds by default, so you may wish to reduce it to get more timely failures.
452457

453458
[[kafka-outbound-gateway-configuration]]
454459
==== Configuration

src/reference/asciidoc/whats-new.adoc

+7
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,10 @@ See, for example, `transformWith()`, `splitWith()` in <<./dsl.adoc#java-dsl, Jav
3838
- For the server and client WebSocket containers, the send buffer overflow strategy is now configurable in `IntegrationWebSocketContainer` and in XML via `send-buffer-overflow-strategy`.
3939
This strategy determines the behavior when a session's outbound message buffer has reached the configured limit.
4040
See <<./web-sockets.adoc#websocket-client-container-attributes, WebSockets Support>> for more information.
41+
42+
43+
[[x6.2-kafka]]
44+
=== Apache Kafka Support Changes
45+
46+
The `KafkaMessageSource` now extracts an `ErrorHandlingDeserializer` configuration from the consumer properties and re-throws `DeserializationException` extracted from failed record headers.
47+
See <<./kafka.adoc#kafka-inbound-pollable, Kafka Inbound Channel Adapter>> for more information.

0 commit comments

Comments
 (0)