Skip to content

GH-3661: Resolve Spring Apache Kafka Deprecations #3751

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -155,6 +155,16 @@ public final void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrat
this.errorMessageStrategy = errorMessageStrategy;
}

/**
* Get an {@link ErrorMessageStrategy} to use to build an error message when a
* exception occurs. Default is the {@link DefaultErrorMessageStrategy}.
* @return the errorMessageStrategy
* @since 6.0
*/
protected ErrorMessageStrategy getErrorMessageStrategy() {
return this.errorMessageStrategy;
}

protected MessagingTemplate getMessagingTemplate() {
return this.messagingTemplate;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -299,8 +299,8 @@ public boolean isLoggingEnabled() {
}

/**
* Set an {@link ErrorMessageStrategy} to use to build an error message when a exception occurs.
* Default is the {@link DefaultErrorMessageStrategy}.
* Set an {@link ErrorMessageStrategy} to use to build an error message when a
* exception occurs. Default is the {@link DefaultErrorMessageStrategy}.
* @param errorMessageStrategy the {@link ErrorMessageStrategy}.
* @since 4.3.10
*/
Expand All @@ -309,6 +309,16 @@ public final void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrat
this.errorMessageStrategy = errorMessageStrategy;
}

/**
* Get an {@link ErrorMessageStrategy} to use to build an error message when a
* exception occurs. Default is the {@link DefaultErrorMessageStrategy}.
* @return the errorMessageStrategy.
* @since 6.0
*/
protected ErrorMessageStrategy getErrorMessageStrategy() {
return this.errorMessageStrategy;
}

@Override
public ManagementOverrides getOverrides() {
return this.managementOverrides;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.integration.dsl.IntegrationComponentSpec;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.TopicPartitionOffset;
Expand Down Expand Up @@ -81,17 +82,15 @@ public KafkaMessageListenerContainerSpec<K, V> concurrency(int concurrency) {
}

/**
* Specify an {@link org.springframework.kafka.listener.ErrorHandler} for the
* Specify an {@link org.springframework.kafka.listener.CommonErrorHandler} for the
* {@link org.springframework.kafka.listener.AbstractMessageListenerContainer}.
* @param errorHandler the {@link org.springframework.kafka.listener.ErrorHandler}.
* @param errorHandler the {@link org.springframework.kafka.listener.CommonErrorHandler}.
* @return the spec.
* @see org.springframework.kafka.listener.ErrorHandler
* @since 6.0
* @see org.springframework.kafka.listener.CommonErrorHandler
*/
@SuppressWarnings("deprecation")
public KafkaMessageListenerContainerSpec<K, V> errorHandler(
org.springframework.kafka.listener.GenericErrorHandler<?> errorHandler) {

this.target.setGenericErrorHandler(errorHandler);
public KafkaMessageListenerContainerSpec<K, V> errorHandler(CommonErrorHandler errorHandler) {
this.target.setCommonErrorHandler(errorHandler);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.kafka.inbound;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.core.AttributeAccessor;
import org.springframework.integration.core.ErrorMessagePublisher;
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.MessageChannel;

/**
* An extension of {@link ErrorMessagePublisher} that can be used in a
* {@link org.springframework.kafka.listener.CommonErrorHandler} for recovering Kafka
* delivery failures.
*
* @author Gary Russell
* @since 6.0
*
*/
public class KafkaErrorSendingMessageRecoverer extends ErrorMessagePublisher implements ConsumerRecordRecoverer {

/**
* Construct an instance to send to the channel with the
* {@link RawRecordHeaderErrorMessageStrategy}.
* @param channel the channel.
*/
public KafkaErrorSendingMessageRecoverer(MessageChannel channel) {
this(channel, new RawRecordHeaderErrorMessageStrategy());
}

/**
* Construct an instance to send the channel, using the error message strategy.
* @param channel the channel.
* @param errorMessageStrategy the strategy.
*/
public KafkaErrorSendingMessageRecoverer(MessageChannel channel, ErrorMessageStrategy errorMessageStrategy) {
setChannel(channel);
setErrorMessageStrategy(errorMessageStrategy);
}

@Override
public void accept(ConsumerRecord<?, ?> record, Exception ex) {
Throwable thrown = ex.getCause();
if (thrown == null) {
thrown = ex;
}
AttributeAccessor attrs = ErrorMessageUtils.getAttributeAccessor(null, null);
attrs.setAttribute(KafkaHeaders.RAW_DATA, record);
publish(thrown, attrs);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.kafka.inbound;

import org.apache.kafka.clients.consumer.Consumer;

import org.springframework.kafka.KafkaException;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetryTemplate;

/**
* Implementations of this interface will generally support a retry template for retrying
* incoming deliveries and this supports adding common attributes to the retry context.
*
* @author Gary Russell
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any general JavaDoc, please?

* @since 6.0
*
*/
public interface KafkaInboundEndpoint {

/**
* {@link org.springframework.retry.RetryContext} attribute key for an acknowledgment
* if the listener is capable of acknowledging.
*/
String CONTEXT_ACKNOWLEDGMENT = "acknowledgment";

/**
* {@link org.springframework.retry.RetryContext} attribute key for the consumer if
* the listener is consumer-aware.
*/
String CONTEXT_CONSUMER = "consumer";

/**
* {@link org.springframework.retry.RetryContext} attribute key for the record.
*/
String CONTEXT_RECORD = "record";

/**
* Execute the runnable with the retry template and recovery callback.
* @param template the template.
* @param callback the callback.
* @param record the record (or records).
* @param acknowledgment the acknowledgment.
* @param consumer the consumer.
* @param runnable the runnable.
*/
default void doWithRetry(RetryTemplate template, RecoveryCallback<?> callback, Object data,
Acknowledgment acknowledgment, Consumer<?, ?> consumer, Runnable runnable) {

try {
template.execute(context -> {
context.setAttribute(CONTEXT_RECORD, data);
context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
context.setAttribute(CONTEXT_CONSUMER, consumer);
runnable.run();
return null;
}, callback);
}
catch (Exception ex) {
throw new KafkaException("Failed to execute runnable", ex);
}
}

}
Loading