Skip to content

Commit 0550380

Browse files
authored
GH-3661: Resolve Spring Apache Kafka Deprecations
Resolves #3661 * Don't allow existing listener; remove unnecessary try/catch; add what's new. * Fix link in whats-new.adoc
1 parent 11c3acd commit 0550380

File tree

12 files changed

+648
-361
lines changed

12 files changed

+648
-361
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -155,6 +155,16 @@ public final void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrat
155155
this.errorMessageStrategy = errorMessageStrategy;
156156
}
157157

158+
/**
159+
* Get an {@link ErrorMessageStrategy} to use to build an error message when a
160+
* exception occurs. Default is the {@link DefaultErrorMessageStrategy}.
161+
* @return the errorMessageStrategy
162+
* @since 6.0
163+
*/
164+
protected ErrorMessageStrategy getErrorMessageStrategy() {
165+
return this.errorMessageStrategy;
166+
}
167+
158168
protected MessagingTemplate getMessagingTemplate() {
159169
return this.messagingTemplate;
160170
}

spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -299,8 +299,8 @@ public boolean isLoggingEnabled() {
299299
}
300300

301301
/**
302-
* Set an {@link ErrorMessageStrategy} to use to build an error message when a exception occurs.
303-
* Default is the {@link DefaultErrorMessageStrategy}.
302+
* Set an {@link ErrorMessageStrategy} to use to build an error message when a
303+
* exception occurs. Default is the {@link DefaultErrorMessageStrategy}.
304304
* @param errorMessageStrategy the {@link ErrorMessageStrategy}.
305305
* @since 4.3.10
306306
*/
@@ -309,6 +309,16 @@ public final void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrat
309309
this.errorMessageStrategy = errorMessageStrategy;
310310
}
311311

312+
/**
313+
* Get an {@link ErrorMessageStrategy} to use to build an error message when a
314+
* exception occurs. Default is the {@link DefaultErrorMessageStrategy}.
315+
* @return the errorMessageStrategy.
316+
* @since 6.0
317+
*/
318+
protected ErrorMessageStrategy getErrorMessageStrategy() {
319+
return this.errorMessageStrategy;
320+
}
321+
312322
@Override
313323
public ManagementOverrides getOverrides() {
314324
return this.managementOverrides;

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaMessageListenerContainerSpec.java

+7-8
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.springframework.core.task.AsyncListenableTaskExecutor;
2525
import org.springframework.integration.dsl.IntegrationComponentSpec;
2626
import org.springframework.kafka.core.ConsumerFactory;
27+
import org.springframework.kafka.listener.CommonErrorHandler;
2728
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
2829
import org.springframework.kafka.listener.ContainerProperties;
2930
import org.springframework.kafka.support.TopicPartitionOffset;
@@ -81,17 +82,15 @@ public KafkaMessageListenerContainerSpec<K, V> concurrency(int concurrency) {
8182
}
8283

8384
/**
84-
* Specify an {@link org.springframework.kafka.listener.ErrorHandler} for the
85+
* Specify an {@link org.springframework.kafka.listener.CommonErrorHandler} for the
8586
* {@link org.springframework.kafka.listener.AbstractMessageListenerContainer}.
86-
* @param errorHandler the {@link org.springframework.kafka.listener.ErrorHandler}.
87+
* @param errorHandler the {@link org.springframework.kafka.listener.CommonErrorHandler}.
8788
* @return the spec.
88-
* @see org.springframework.kafka.listener.ErrorHandler
89+
* @since 6.0
90+
* @see org.springframework.kafka.listener.CommonErrorHandler
8991
*/
90-
@SuppressWarnings("deprecation")
91-
public KafkaMessageListenerContainerSpec<K, V> errorHandler(
92-
org.springframework.kafka.listener.GenericErrorHandler<?> errorHandler) {
93-
94-
this.target.setGenericErrorHandler(errorHandler);
92+
public KafkaMessageListenerContainerSpec<K, V> errorHandler(CommonErrorHandler errorHandler) {
93+
this.target.setCommonErrorHandler(errorHandler);
9594
return this;
9695
}
9796

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.kafka.inbound;
18+
19+
import org.apache.kafka.clients.consumer.ConsumerRecord;
20+
21+
import org.springframework.core.AttributeAccessor;
22+
import org.springframework.integration.core.ErrorMessagePublisher;
23+
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
24+
import org.springframework.integration.support.ErrorMessageStrategy;
25+
import org.springframework.integration.support.ErrorMessageUtils;
26+
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
27+
import org.springframework.kafka.support.KafkaHeaders;
28+
import org.springframework.messaging.MessageChannel;
29+
30+
/**
31+
* An extension of {@link ErrorMessagePublisher} that can be used in a
32+
* {@link org.springframework.kafka.listener.CommonErrorHandler} for recovering Kafka
33+
* delivery failures.
34+
*
35+
* @author Gary Russell
36+
* @since 6.0
37+
*
38+
*/
39+
public class KafkaErrorSendingMessageRecoverer extends ErrorMessagePublisher implements ConsumerRecordRecoverer {
40+
41+
/**
42+
* Construct an instance to send to the channel with the
43+
* {@link RawRecordHeaderErrorMessageStrategy}.
44+
* @param channel the channel.
45+
*/
46+
public KafkaErrorSendingMessageRecoverer(MessageChannel channel) {
47+
this(channel, new RawRecordHeaderErrorMessageStrategy());
48+
}
49+
50+
/**
51+
* Construct an instance to send the channel, using the error message strategy.
52+
* @param channel the channel.
53+
* @param errorMessageStrategy the strategy.
54+
*/
55+
public KafkaErrorSendingMessageRecoverer(MessageChannel channel, ErrorMessageStrategy errorMessageStrategy) {
56+
setChannel(channel);
57+
setErrorMessageStrategy(errorMessageStrategy);
58+
}
59+
60+
@Override
61+
public void accept(ConsumerRecord<?, ?> record, Exception ex) {
62+
Throwable thrown = ex.getCause();
63+
if (thrown == null) {
64+
thrown = ex;
65+
}
66+
AttributeAccessor attrs = ErrorMessageUtils.getAttributeAccessor(null, null);
67+
attrs.setAttribute(KafkaHeaders.RAW_DATA, record);
68+
publish(thrown, attrs);
69+
}
70+
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.kafka.inbound;
18+
19+
import org.apache.kafka.clients.consumer.Consumer;
20+
21+
import org.springframework.kafka.KafkaException;
22+
import org.springframework.kafka.support.Acknowledgment;
23+
import org.springframework.retry.RecoveryCallback;
24+
import org.springframework.retry.support.RetryTemplate;
25+
26+
/**
27+
* Implementations of this interface will generally support a retry template for retrying
28+
* incoming deliveries and this supports adding common attributes to the retry context.
29+
*
30+
* @author Gary Russell
31+
* @since 6.0
32+
*
33+
*/
34+
public interface KafkaInboundEndpoint {
35+
36+
/**
37+
* {@link org.springframework.retry.RetryContext} attribute key for an acknowledgment
38+
* if the listener is capable of acknowledging.
39+
*/
40+
String CONTEXT_ACKNOWLEDGMENT = "acknowledgment";
41+
42+
/**
43+
* {@link org.springframework.retry.RetryContext} attribute key for the consumer if
44+
* the listener is consumer-aware.
45+
*/
46+
String CONTEXT_CONSUMER = "consumer";
47+
48+
/**
49+
* {@link org.springframework.retry.RetryContext} attribute key for the record.
50+
*/
51+
String CONTEXT_RECORD = "record";
52+
53+
/**
54+
* Execute the runnable with the retry template and recovery callback.
55+
* @param template the template.
56+
* @param callback the callback.
57+
* @param record the record (or records).
58+
* @param acknowledgment the acknowledgment.
59+
* @param consumer the consumer.
60+
* @param runnable the runnable.
61+
*/
62+
default void doWithRetry(RetryTemplate template, RecoveryCallback<?> callback, Object data,
63+
Acknowledgment acknowledgment, Consumer<?, ?> consumer, Runnable runnable) {
64+
65+
try {
66+
template.execute(context -> {
67+
context.setAttribute(CONTEXT_RECORD, data);
68+
context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
69+
context.setAttribute(CONTEXT_CONSUMER, consumer);
70+
runnable.run();
71+
return null;
72+
}, callback);
73+
}
74+
catch (Exception ex) {
75+
throw new KafkaException("Failed to execute runnable", ex);
76+
}
77+
}
78+
79+
}

0 commit comments

Comments
 (0)