Skip to content

Commit e956413

Browse files
authored
Refactor DeserializationException Detection Code
- move to `ErrorHandlingUtils` for reuse. * Pass class loader into utility method instead of an ApplicationContext.
1 parent 25671a0 commit e956413

File tree

3 files changed

+79
-49
lines changed

3 files changed

+79
-49
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -82,6 +82,7 @@ public abstract class AbstractMessageListenerContainer<K, V>
8282

8383
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass())); // NOSONAR
8484

85+
@NonNull
8586
protected final ConsumerFactory<K, V> consumerFactory; // NOSONAR (final)
8687

8788
private final ContainerProperties containerProperties;

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818

1919
import java.time.Duration;
2020
import java.util.List;
21+
import java.util.Map;
22+
import java.util.Properties;
2123
import java.util.Set;
2224
import java.util.function.BiConsumer;
2325

2426
import org.apache.kafka.clients.consumer.Consumer;
27+
import org.apache.kafka.clients.consumer.ConsumerConfig;
2528
import org.apache.kafka.clients.consumer.ConsumerRecord;
2629
import org.apache.kafka.clients.consumer.ConsumerRecords;
2730
import org.apache.kafka.common.TopicPartition;
@@ -30,7 +33,11 @@
3033
import org.springframework.classify.BinaryExceptionClassifier;
3134
import org.springframework.core.log.LogAccessor;
3235
import org.springframework.kafka.KafkaException;
36+
import org.springframework.kafka.core.ConsumerFactory;
3337
import org.springframework.kafka.support.KafkaUtils;
38+
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
39+
import org.springframework.lang.Nullable;
40+
import org.springframework.util.ClassUtils;
3441
import org.springframework.util.backoff.BackOff;
3542
import org.springframework.util.backoff.BackOffExecution;
3643

@@ -256,4 +263,63 @@ public static Exception findRootCause(Exception exception) {
256263
return realException;
257264
}
258265

266+
/**
267+
* Determine whether the key or value deserializer is an instance of
268+
* {@link ErrorHandlingDeserializer}.
269+
* @param <K> the key type.
270+
* @param <V> the value type.
271+
* @param consumerFactory the consumer factory.
272+
* @param consumerOverrides consumer factory property overrides.
273+
* @param isValue true to find the value deserializer.
274+
* @param classLoader the class loader to load the deserializer class.
275+
* @return true if the deserializer is an instance of
276+
* {@link ErrorHandlingDeserializer}.
277+
* @since 3.0.10
278+
*/
279+
public static <K, V> boolean checkDeserializer(ConsumerFactory<K, V> consumerFactory,
280+
Properties consumerOverrides, boolean isValue, ClassLoader classLoader) {
281+
282+
Object deser = findDeserializerClass(consumerFactory, consumerOverrides, isValue);
283+
Class<?> deserializer = null;
284+
if (deser instanceof Class<?> deserClass) {
285+
deserializer = deserClass;
286+
}
287+
else if (deser instanceof String str) {
288+
try {
289+
deserializer = ClassUtils.forName(str, classLoader);
290+
}
291+
catch (ClassNotFoundException | LinkageError e) {
292+
throw new IllegalStateException(e);
293+
}
294+
}
295+
else if (deser != null) {
296+
throw new IllegalStateException("Deserializer must be a class or class name, not a " + deser.getClass());
297+
}
298+
return deserializer != null && ErrorHandlingDeserializer.class.isAssignableFrom(deserializer);
299+
}
300+
301+
@Nullable
302+
private static <K, V> Object findDeserializerClass(ConsumerFactory<K, V> consumerFactory,
303+
Properties consumerOverrides, boolean isValue) {
304+
305+
Map<String, Object> props = consumerFactory.getConfigurationProperties();
306+
Object configuredDeserializer = isValue
307+
? consumerFactory.getValueDeserializer()
308+
: consumerFactory.getKeyDeserializer();
309+
if (configuredDeserializer == null) {
310+
Object deser = consumerOverrides.get(isValue
311+
? ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
312+
: ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
313+
if (deser == null) {
314+
deser = props.get(isValue
315+
? ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
316+
: ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
317+
}
318+
return deser;
319+
}
320+
else {
321+
return configuredDeserializer.getClass();
322+
}
323+
}
324+
259325
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 11 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@
115115
import org.springframework.kafka.support.micrometer.KafkaRecordReceiverContext;
116116
import org.springframework.kafka.support.micrometer.MicrometerHolder;
117117
import org.springframework.kafka.support.serializer.DeserializationException;
118-
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
119118
import org.springframework.kafka.support.serializer.SerializationUtils;
120119
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
121120
import org.springframework.lang.Nullable;
@@ -129,7 +128,6 @@
129128
import org.springframework.transaction.support.TransactionSynchronizationManager;
130129
import org.springframework.transaction.support.TransactionTemplate;
131130
import org.springframework.util.Assert;
132-
import org.springframework.util.ClassUtils;
133131
import org.springframework.util.CollectionUtils;
134132
import org.springframework.util.ObjectUtils;
135133
import org.springframework.util.StringUtils;
@@ -919,10 +917,19 @@ else if (listener instanceof MessageListener) {
919917
this.logger.info(toString());
920918
}
921919
Map<String, Object> props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();
920+
ApplicationContext applicationContext = getApplicationContext();
922921
this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull()
923-
|| checkDeserializer(findDeserializerClass(props, consumerProperties, false));
922+
|| ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory,
923+
consumerProperties, false,
924+
applicationContext == null
925+
? getClass().getClassLoader()
926+
: applicationContext.getClassLoader());
924927
this.checkNullValueForExceptions = this.containerProperties.isCheckDeserExWhenValueNull()
925-
|| checkDeserializer(findDeserializerClass(props, consumerProperties, true));
928+
|| ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory,
929+
consumerProperties, true,
930+
applicationContext == null
931+
? getClass().getClassLoader()
932+
: applicationContext.getClassLoader());
926933
this.syncCommitTimeout = determineSyncCommitTimeout();
927934
if (this.containerProperties.getSyncCommitTimeout() == null) {
928935
// update the property so we can use it directly from code elsewhere
@@ -1247,27 +1254,6 @@ else if (timeout instanceof String str) {
12471254
}
12481255
}
12491256

1250-
@Nullable
1251-
private Object findDeserializerClass(Map<String, Object> props, Properties consumerOverrides, boolean isValue) {
1252-
Object configuredDeserializer = isValue
1253-
? KafkaMessageListenerContainer.this.consumerFactory.getValueDeserializer()
1254-
: KafkaMessageListenerContainer.this.consumerFactory.getKeyDeserializer();
1255-
if (configuredDeserializer == null) {
1256-
Object deser = consumerOverrides.get(isValue
1257-
? ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
1258-
: ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
1259-
if (deser == null) {
1260-
deser = props.get(isValue
1261-
? ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
1262-
: ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
1263-
}
1264-
return deser;
1265-
}
1266-
else {
1267-
return configuredDeserializer.getClass();
1268-
}
1269-
}
1270-
12711257
private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscribingConsumer) {
12721258
if (KafkaMessageListenerContainer.this.topicPartitions == null) {
12731259
ConsumerRebalanceListener rebalanceListener = new ListenerConsumerRebalanceListener();
@@ -1293,29 +1279,6 @@ private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscr
12931279
}
12941280
}
12951281

1296-
private boolean checkDeserializer(@Nullable Object deser) {
1297-
Class<?> deserializer = null;
1298-
if (deser instanceof Class<?> deserClass) {
1299-
deserializer = deserClass;
1300-
}
1301-
else if (deser instanceof String str) {
1302-
try {
1303-
ApplicationContext applicationContext = getApplicationContext();
1304-
ClassLoader classLoader = applicationContext == null
1305-
? getClass().getClassLoader()
1306-
: applicationContext.getClassLoader();
1307-
deserializer = ClassUtils.forName(str, classLoader);
1308-
}
1309-
catch (ClassNotFoundException | LinkageError e) {
1310-
throw new IllegalStateException(e);
1311-
}
1312-
}
1313-
else if (deser != null) {
1314-
throw new IllegalStateException("Deserializer must be a class or class name, not a " + deser.getClass());
1315-
}
1316-
return deserializer != null && ErrorHandlingDeserializer.class.isAssignableFrom(deserializer);
1317-
}
1318-
13191282
protected void checkConsumer() {
13201283
long timeSinceLastPoll = System.currentTimeMillis() - this.lastPoll;
13211284
if (((float) timeSinceLastPoll) / (float) this.containerProperties.getPollTimeout()

0 commit comments

Comments
 (0)