diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java index d6f5885b644..eb3502e24b9 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -155,6 +155,16 @@ public final void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrat this.errorMessageStrategy = errorMessageStrategy; } + /** + * Get an {@link ErrorMessageStrategy} to use to build an error message when a + * exception occurs. Default is the {@link DefaultErrorMessageStrategy}. + * @return the errorMessageStrategy + * @since 6.0 + */ + protected ErrorMessageStrategy getErrorMessageStrategy() { + return this.errorMessageStrategy; + } + protected MessagingTemplate getMessagingTemplate() { return this.messagingTemplate; } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java b/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java index 83a918c92f8..a6c770f3a94 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -299,8 +299,8 @@ public boolean isLoggingEnabled() { } /** - * Set an {@link ErrorMessageStrategy} to use to build an error message when a exception occurs. - * Default is the {@link DefaultErrorMessageStrategy}. + * Set an {@link ErrorMessageStrategy} to use to build an error message when a + * exception occurs. Default is the {@link DefaultErrorMessageStrategy}. * @param errorMessageStrategy the {@link ErrorMessageStrategy}. * @since 4.3.10 */ @@ -309,6 +309,16 @@ public final void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrat this.errorMessageStrategy = errorMessageStrategy; } + /** + * Get an {@link ErrorMessageStrategy} to use to build an error message when a + * exception occurs. Default is the {@link DefaultErrorMessageStrategy}. + * @return the errorMessageStrategy. + * @since 6.0 + */ + protected ErrorMessageStrategy getErrorMessageStrategy() { + return this.errorMessageStrategy; + } + @Override public ManagementOverrides getOverrides() { return this.managementOverrides; diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaMessageListenerContainerSpec.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaMessageListenerContainerSpec.java index 887c8e4130c..34b98d79c37 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaMessageListenerContainerSpec.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaMessageListenerContainerSpec.java @@ -24,6 +24,7 @@ import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.integration.dsl.IntegrationComponentSpec; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.support.TopicPartitionOffset; @@ -81,17 +82,15 @@ public KafkaMessageListenerContainerSpec concurrency(int concurrency) { } /** - * Specify an {@link org.springframework.kafka.listener.ErrorHandler} for the + * Specify an {@link org.springframework.kafka.listener.CommonErrorHandler} for the * {@link org.springframework.kafka.listener.AbstractMessageListenerContainer}. - * @param errorHandler the {@link org.springframework.kafka.listener.ErrorHandler}. + * @param errorHandler the {@link org.springframework.kafka.listener.CommonErrorHandler}. * @return the spec. - * @see org.springframework.kafka.listener.ErrorHandler + * @since 6.0 + * @see org.springframework.kafka.listener.CommonErrorHandler */ - @SuppressWarnings("deprecation") - public KafkaMessageListenerContainerSpec errorHandler( - org.springframework.kafka.listener.GenericErrorHandler errorHandler) { - - this.target.setGenericErrorHandler(errorHandler); + public KafkaMessageListenerContainerSpec errorHandler(CommonErrorHandler errorHandler) { + this.target.setCommonErrorHandler(errorHandler); return this; } diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaErrorSendingMessageRecoverer.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaErrorSendingMessageRecoverer.java new file mode 100644 index 00000000000..c591e6c4ae1 --- /dev/null +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaErrorSendingMessageRecoverer.java @@ -0,0 +1,71 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.kafka.inbound; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import org.springframework.core.AttributeAccessor; +import org.springframework.integration.core.ErrorMessagePublisher; +import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy; +import org.springframework.integration.support.ErrorMessageStrategy; +import org.springframework.integration.support.ErrorMessageUtils; +import org.springframework.kafka.listener.ConsumerRecordRecoverer; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.MessageChannel; + +/** + * An extension of {@link ErrorMessagePublisher} that can be used in a + * {@link org.springframework.kafka.listener.CommonErrorHandler} for recovering Kafka + * delivery failures. + * + * @author Gary Russell + * @since 6.0 + * + */ +public class KafkaErrorSendingMessageRecoverer extends ErrorMessagePublisher implements ConsumerRecordRecoverer { + + /** + * Construct an instance to send to the channel with the + * {@link RawRecordHeaderErrorMessageStrategy}. + * @param channel the channel. + */ + public KafkaErrorSendingMessageRecoverer(MessageChannel channel) { + this(channel, new RawRecordHeaderErrorMessageStrategy()); + } + + /** + * Construct an instance to send the channel, using the error message strategy. + * @param channel the channel. + * @param errorMessageStrategy the strategy. + */ + public KafkaErrorSendingMessageRecoverer(MessageChannel channel, ErrorMessageStrategy errorMessageStrategy) { + setChannel(channel); + setErrorMessageStrategy(errorMessageStrategy); + } + + @Override + public void accept(ConsumerRecord record, Exception ex) { + Throwable thrown = ex.getCause(); + if (thrown == null) { + thrown = ex; + } + AttributeAccessor attrs = ErrorMessageUtils.getAttributeAccessor(null, null); + attrs.setAttribute(KafkaHeaders.RAW_DATA, record); + publish(thrown, attrs); + } + +} diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundEndpoint.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundEndpoint.java new file mode 100644 index 00000000000..1b5f74f6fc9 --- /dev/null +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundEndpoint.java @@ -0,0 +1,79 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.kafka.inbound; + +import org.apache.kafka.clients.consumer.Consumer; + +import org.springframework.kafka.KafkaException; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.retry.RecoveryCallback; +import org.springframework.retry.support.RetryTemplate; + +/** + * Implementations of this interface will generally support a retry template for retrying + * incoming deliveries and this supports adding common attributes to the retry context. + * + * @author Gary Russell + * @since 6.0 + * + */ +public interface KafkaInboundEndpoint { + + /** + * {@link org.springframework.retry.RetryContext} attribute key for an acknowledgment + * if the listener is capable of acknowledging. + */ + String CONTEXT_ACKNOWLEDGMENT = "acknowledgment"; + + /** + * {@link org.springframework.retry.RetryContext} attribute key for the consumer if + * the listener is consumer-aware. + */ + String CONTEXT_CONSUMER = "consumer"; + + /** + * {@link org.springframework.retry.RetryContext} attribute key for the record. + */ + String CONTEXT_RECORD = "record"; + + /** + * Execute the runnable with the retry template and recovery callback. + * @param template the template. + * @param callback the callback. + * @param record the record (or records). + * @param acknowledgment the acknowledgment. + * @param consumer the consumer. + * @param runnable the runnable. + */ + default void doWithRetry(RetryTemplate template, RecoveryCallback callback, Object data, + Acknowledgment acknowledgment, Consumer consumer, Runnable runnable) { + + try { + template.execute(context -> { + context.setAttribute(CONTEXT_RECORD, data); + context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment); + context.setAttribute(CONTEXT_CONSUMER, consumer); + runnable.run(); + return null; + }, callback); + } + catch (Exception ex) { + throw new KafkaException("Failed to execute runnable", ex); + } + } + +} diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java index 3e0cbbbd348..f8b683120b9 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java @@ -31,6 +31,7 @@ import org.springframework.integration.context.OrderlyShutdownCapable; import org.springframework.integration.core.Pausable; import org.springframework.integration.gateway.MessagingGatewaySupport; +import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer; import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy; import org.springframework.integration.support.AbstractIntegrationMessageBuilder; import org.springframework.integration.support.ErrorMessageUtils; @@ -39,13 +40,13 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.listener.ConsumerSeekAware; import org.springframework.kafka.listener.ContainerProperties; -import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.converter.ConversionException; import org.springframework.kafka.support.converter.KafkaMessageHeaders; import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHeaders; @@ -70,7 +71,8 @@ * @since 5.4 * */ -public class KafkaInboundGateway extends MessagingGatewaySupport implements Pausable, OrderlyShutdownCapable { +public class KafkaInboundGateway extends MessagingGatewaySupport + implements KafkaInboundEndpoint, Pausable, OrderlyShutdownCapable { private static final ThreadLocal ATTRIBUTES_HOLDER = new ThreadLocal<>(); @@ -130,6 +132,12 @@ public void setPayloadType(Class payloadType) { * Specify a {@link RetryTemplate} instance to wrap * {@link KafkaInboundGateway.IntegrationRecordMessageListener} into * {@code RetryingMessageListenerAdapter}. + *

+ * IMPORTANT: This form of retry is blocking and could cause a rebalance if the + * aggregate retry delays across all polled records might exceed the + * {@code max.poll.interval.ms}. Instead, consider adding a + * {@code DefaultErrorHandler} to the listener container, configured with a + * {@link KafkaErrorSendingMessageRecoverer}. * @param retryTemplate the {@link RetryTemplate} to use. */ public void setRetryTemplate(RetryTemplate retryTemplate) { @@ -137,10 +145,13 @@ public void setRetryTemplate(RetryTemplate retryTemplate) { } /** - * A {@link RecoveryCallback} instance for retry operation; - * if null, the exception will be thrown to the container after retries are exhausted - * (unless an error channel is configured). - * Does not make sense if {@link #setRetryTemplate(RetryTemplate)} isn't specified. + * A {@link RecoveryCallback} instance for retry operation; if null, the exception + * will be thrown to the container after retries are exhausted (unless an error + * channel is configured). Only used if + * {@link #setRetryTemplate(RetryTemplate)} is specified. Default is an + * {@link ErrorMessageSendingRecoverer} if an error channel has been provided. Set to + * null if you wish to throw the exception back to the container after retries are + * exhausted. * @param recoveryCallback the recovery callback. */ public void setRecoveryCallback(RecoveryCallback recoveryCallback) { @@ -170,19 +181,21 @@ public void setBindSourceRecord(boolean bindSourceRecord) { this.bindSourceRecord = bindSourceRecord; } - @SuppressWarnings("deprecation") @Override protected void onInit() { super.onInit(); - MessageListener kafkaListener = this.listener; if (this.retryTemplate != null) { - kafkaListener = - new org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter<>(kafkaListener, - this.retryTemplate, this.recoveryCallback); this.retryTemplate.registerListener(this.listener); + MessageChannel errorChannel = getErrorChannel(); + if (this.recoveryCallback != null && errorChannel != null) { + this.recoveryCallback = new ErrorMessageSendingRecoverer(errorChannel, getErrorMessageStrategy()); + } } ContainerProperties containerProperties = this.messageListenerContainer.getContainerProperties(); - containerProperties.setMessageListener(kafkaListener); + Object existing = containerProperties.getMessageListener(); + Assert.state(existing == null, () -> "listener container cannot have an existing message listener (" + existing + + ")"); + containerProperties.setMessageListener(this.listener); this.containerDeliveryAttemptPresent = containerProperties.isDeliveryAttemptHeader(); } @@ -236,9 +249,11 @@ public int afterShutdown() { * attributes for use by the {@link org.springframework.integration.support.ErrorMessageStrategy}. * @param record the record. * @param message the message. + * @param conversionError a conversion error occurred. */ - private void setAttributesIfNecessary(Object record, Message message) { - boolean needHolder = getErrorChannel() != null && this.retryTemplate == null; + private void setAttributesIfNecessary(Object record, @Nullable Message message, boolean conversionError) { + boolean needHolder = ATTRIBUTES_HOLDER.get() == null + && (getErrorChannel() != null && (this.retryTemplate == null || conversionError)); boolean needAttributes = needHolder | this.retryTemplate != null; if (needHolder) { ATTRIBUTES_HOLDER.set(ErrorMessageUtils.getAttributeAccessor(null, null)); @@ -281,36 +296,63 @@ public void onPartitionsAssigned(Map assignments, Consumer public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment, Consumer consumer) { Message message = null; try { - message = enhanceHeaders(toMessagingMessage(record, acknowledgment, consumer), record); - setAttributesIfNecessary(record, message); + message = toMessagingMessage(record, acknowledgment, consumer); } - catch (RuntimeException e) { + catch (RuntimeException ex) { + if (KafkaInboundGateway.this.retryTemplate == null) { + setAttributesIfNecessary(record, null, true); + } MessageChannel errorChannel = getErrorChannel(); if (errorChannel != null) { KafkaInboundGateway.this.messagingTemplate.send(errorChannel, buildErrorMessage(null, - new ConversionException("Failed to convert to message", record, e))); + new ConversionException("Failed to convert to message", record, ex))); + } + else { + throw ex; } } if (message != null) { - try { - Message reply = sendAndReceiveMessage(message); - if (reply != null) { - reply = enhanceReply(message, reply); - KafkaInboundGateway.this.kafkaTemplate.send(reply); - } - } - finally { - if (KafkaInboundGateway.this.retryTemplate == null) { - ATTRIBUTES_HOLDER.remove(); - } - } + sendAndReceive(record, message, acknowledgment, consumer); } else { KafkaInboundGateway.this.logger.debug(() -> "Converter returned a null message for: " + record); } } - private Message enhanceHeaders(Message message, ConsumerRecord record) { + private void sendAndReceive(ConsumerRecord record, Message message, Acknowledgment acknowledgment, + Consumer consumer) { + + RetryTemplate template = KafkaInboundGateway.this.retryTemplate; + if (template != null) { + doWithRetry(template, KafkaInboundGateway.this.recoveryCallback, record, acknowledgment, consumer, + () -> { + doSendAndReceive(enhanceHeadersAndSaveAttributes(message, record)); + }); + } + else { + doSendAndReceive(enhanceHeadersAndSaveAttributes(message, record)); + } + } + + private void doSendAndReceive(Message message) { + try { + Message reply = sendAndReceiveMessage(message); + if (reply != null) { + reply = enhanceReply(message, reply); + KafkaInboundGateway.this.kafkaTemplate.send(reply); + } + else { + this.logger.debug(() -> "No reply received for " + message); + } + } + finally { + if (KafkaInboundGateway.this.retryTemplate == null) { + ATTRIBUTES_HOLDER.remove(); + } + } + } + + private Message enhanceHeadersAndSaveAttributes(Message message, ConsumerRecord record) { Message messageToReturn = message; if (message.getHeaders() instanceof KafkaMessageHeaders) { Map rawHeaders = ((KafkaMessageHeaders) message.getHeaders()).getRawHeaders(); @@ -345,6 +387,7 @@ else if (KafkaInboundGateway.this.containerDeliveryAttemptPresent) { } messageToReturn = builder.build(); } + setAttributesIfNecessary(record, messageToReturn, false); return messageToReturn; } diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java index 6e37423682e..931653492ba 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java @@ -33,6 +33,7 @@ import org.springframework.integration.context.OrderlyShutdownCapable; import org.springframework.integration.core.Pausable; import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer; import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy; import org.springframework.integration.support.ErrorMessageUtils; import org.springframework.integration.support.MessageBuilder; @@ -53,9 +54,9 @@ import org.springframework.kafka.support.converter.KafkaMessageHeaders; import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.support.ErrorMessage; import org.springframework.retry.RecoveryCallback; import org.springframework.retry.RetryCallback; import org.springframework.retry.RetryContext; @@ -76,8 +77,8 @@ * * @since 5.4 */ -public class KafkaMessageDrivenChannelAdapter extends MessageProducerSupport implements OrderlyShutdownCapable, - Pausable { +public class KafkaMessageDrivenChannelAdapter extends MessageProducerSupport + implements KafkaInboundEndpoint, OrderlyShutdownCapable, Pausable { private static final ThreadLocal ATTRIBUTES_HOLDER = new ThreadLocal<>(); @@ -105,6 +106,8 @@ public class KafkaMessageDrivenChannelAdapter extends MessageProducerSuppo private boolean containerDeliveryAttemptPresent; + private boolean doFilterInRetry; + /** * Construct an instance with mode {@link ListenerMode#record}. * @param messageListenerContainer the container. @@ -189,6 +192,12 @@ public void setAckDiscarded(boolean ackDiscarded) { * Specify a {@link RetryTemplate} instance to wrap * {@link KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener} into * {@code RetryingMessageListenerAdapter}. + *

+ * IMPORTANT: This form of retry is blocking and could cause a rebalance if the + * aggregate retry delays across all polled records might exceed the + * {@code max.poll.interval.ms}. Instead, consider adding a + * {@code DefaultErrorHandler} to the listener container, configured with a + * {@link KafkaErrorSendingMessageRecoverer}. * @param retryTemplate the {@link RetryTemplate} to use. */ public void setRetryTemplate(RetryTemplate retryTemplate) { @@ -198,10 +207,13 @@ public void setRetryTemplate(RetryTemplate retryTemplate) { } /** - * A {@link RecoveryCallback} instance for retry operation; - * if null, the exception will be thrown to the container after retries are exhausted - * (unless an error channel is configured). - * Does not make sense if {@link #setRetryTemplate(RetryTemplate)} isn't specified. + * A {@link RecoveryCallback} instance for retry operation; if null, the exception + * will be thrown to the container after retries are exhausted (unless an error + * channel is configured). Only used if a + * {@link #setRetryTemplate(RetryTemplate)} is specified. Default is an + * {@link ErrorMessageSendingRecoverer} if an error channel has been provided. Set to + * null if you wish to throw the exception back to the container after retries are + * exhausted. * @param recoveryCallback the recovery callback. */ public void setRecoveryCallback(RecoveryCallback recoveryCallback) { @@ -215,6 +227,8 @@ public void setRecoveryCallback(RecoveryCallback recoveryCallback) { * if both of them are present. * Does not make sense if only one of {@link RetryTemplate} or * {@link RecordFilterStrategy} is present, or any. + * When true, the filter is called for each retry; when false, the filter is only + * called once for each delivery from the container. * @param filterInRetry the order for {@code RetryingMessageListenerAdapter} and * {@link FilteringMessageListenerAdapter} wrapping. Defaults to {@code false}. */ @@ -261,40 +275,30 @@ public String getComponentType() { return "kafka:message-driven-channel-adapter"; } - @SuppressWarnings("deprecation") @Override protected void onInit() { super.onInit(); - if (this.retryTemplate != null) { - Assert.state(getErrorChannel() == null, "Cannot have an 'errorChannel' property when a 'RetryTemplate' is " - + "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to " - + "send an error message when retries are exhausted"); - } ContainerProperties containerProperties = this.messageListenerContainer.getContainerProperties(); + Object existing = containerProperties.getMessageListener(); + Assert.state(existing == null, () -> "listener container cannot have an existing message listener (" + existing + + ")"); if (this.mode.equals(ListenerMode.record)) { MessageListener listener = this.recordListener; - boolean doFilterInRetry = this.filterInRetry && this.retryTemplate != null + this.doFilterInRetry = this.filterInRetry && this.retryTemplate != null && this.recordFilterStrategy != null; - if (doFilterInRetry) { - listener = new FilteringMessageListenerAdapter<>(listener, this.recordFilterStrategy, - this.ackDiscarded); - listener = new org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter<>(listener, - this.retryTemplate, this.recoveryCallback); + if (this.retryTemplate != null) { + MessageChannel errorChannel = getErrorChannel(); + if (this.recoveryCallback != null && errorChannel != null) { + this.recoveryCallback = new ErrorMessageSendingRecoverer(errorChannel, getErrorMessageStrategy()); + } this.retryTemplate.registerListener(this.recordListener); } - else { - if (this.retryTemplate != null) { - listener = new org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter<>(listener, - this.retryTemplate, this.recoveryCallback); - this.retryTemplate.registerListener(this.recordListener); - } - if (this.recordFilterStrategy != null) { - listener = new FilteringMessageListenerAdapter<>(listener, this.recordFilterStrategy, - this.ackDiscarded); - } + if (!this.doFilterInRetry && this.recordFilterStrategy != null) { + listener = new FilteringMessageListenerAdapter<>(listener, this.recordFilterStrategy, + this.ackDiscarded); } containerProperties.setMessageListener(listener); } @@ -353,9 +357,11 @@ public int afterShutdown() { * attributes for use by the {@link org.springframework.integration.support.ErrorMessageStrategy}. * @param record the record. * @param message the message. + * @param conversionError a conversion error occurred. */ - private void setAttributesIfNecessary(Object record, Message message) { - boolean needHolder = getErrorChannel() != null && this.retryTemplate == null; + private void setAttributesIfNecessary(Object record, @Nullable Message message, boolean conversionError) { + boolean needHolder = ATTRIBUTES_HOLDER.get() == null + && (getErrorChannel() != null && (this.retryTemplate == null || conversionError)); boolean needAttributes = needHolder | this.retryTemplate != null; if (needHolder) { ATTRIBUTES_HOLDER.set(ErrorMessageUtils.getAttributeAccessor(null, null)); @@ -430,20 +436,45 @@ public void onPartitionsAssigned(Map assignments, Consumer @Override public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment, Consumer consumer) { + Message message = null; try { - message = enhanceHeaders(toMessagingMessage(record, acknowledgment, consumer), record); - setAttributesIfNecessary(record, message); + message = toMessagingMessage(record, acknowledgment, consumer); } - catch (RuntimeException e) { - RuntimeException exception = new ConversionException("Failed to convert to message", record, e); - sendErrorMessageIfNecessary(null, exception); + catch (RuntimeException ex) { + if (KafkaMessageDrivenChannelAdapter.this.retryTemplate == null) { + setAttributesIfNecessary(record, null, true); + } + MessageChannel errorChannel = getErrorChannel(); + if (errorChannel != null) { + RuntimeException exception = new ConversionException("Failed to convert to message", record, ex); + sendErrorMessageIfNecessary(null, exception); + } + else { + throw ex; + } } + RetryTemplate template = KafkaMessageDrivenChannelAdapter.this.retryTemplate; + if (template != null) { + Message toSend = message; + doWithRetry(template, KafkaMessageDrivenChannelAdapter.this.recoveryCallback, record, acknowledgment, + consumer, () -> { + if (!KafkaMessageDrivenChannelAdapter.this.filterInRetry || passesFilter(record)) { + sendMessageIfAny(enhanceHeadersAndSaveAttributes(toSend, record), record); + } + }); + } + else { + sendMessageIfAny(enhanceHeadersAndSaveAttributes(message, record), record); + } + } - sendMessageIfAny(message, record); + private boolean passesFilter(ConsumerRecord record) { + RecordFilterStrategy filter = KafkaMessageDrivenChannelAdapter.this.recordFilterStrategy; + return filter == null || !filter.filter(record); } - private Message enhanceHeaders(Message message, ConsumerRecord record) { + private Message enhanceHeadersAndSaveAttributes(Message message, ConsumerRecord record) { Message messageToReturn = message; if (message.getHeaders() instanceof KafkaMessageHeaders) { Map rawHeaders = ((KafkaMessageHeaders) message.getHeaders()).getRawHeaders(); @@ -478,6 +509,7 @@ else if (KafkaMessageDrivenChannelAdapter.this.containerDeliveryAttemptPresent) } messageToReturn = builder.build(); } + setAttributesIfNecessary(record, messageToReturn, false); return messageToReturn; } @@ -522,21 +554,59 @@ public void onPartitionsAssigned(Map assignments, Consumer public void onMessage(List> records, Acknowledgment acknowledgment, Consumer consumer) { + Message message = null; + if (!KafkaMessageDrivenChannelAdapter.this.filterInRetry) { + message = toMessage(records, acknowledgment, consumer); + } + if (message != null) { + RetryTemplate template = KafkaMessageDrivenChannelAdapter.this.retryTemplate; + if (template != null) { + doWIthRetry(records, acknowledgment, consumer, message, template); + } + else { + sendMessageIfAny(message, records); + } + } + } + + private void doWIthRetry(List> records, Acknowledgment acknowledgment, + Consumer consumer, Message message, RetryTemplate template) { + + doWithRetry(template, KafkaMessageDrivenChannelAdapter.this.recoveryCallback, records, acknowledgment, + consumer, () -> { + if (KafkaMessageDrivenChannelAdapter.this.filterInRetry) { + List> filtered = + KafkaMessageDrivenChannelAdapter.this.recordFilterStrategy.filterBatch(records); + Message toSend = message; + if (filtered.size() != records.size()) { + toSend = toMessage(filtered, acknowledgment, consumer); + } + sendMessageIfAny(toSend, filtered); + } + }); + } + + @Nullable + private Message toMessage(List> records, Acknowledgment acknowledgment, + Consumer consumer) { + Message message = null; try { message = toMessagingMessage(records, acknowledgment, consumer); - setAttributesIfNecessary(records, message); + setAttributesIfNecessary(records, message, false); } - catch (RuntimeException e) { + catch (RuntimeException ex) { Exception exception = new ConversionException("Failed to convert to message", - records.stream().collect(Collectors.toList()), e); + records.stream().collect(Collectors.toList()), ex); MessageChannel errorChannel = getErrorChannel(); if (errorChannel != null) { - getMessagingTemplate().send(errorChannel, new ErrorMessage(exception)); + getMessagingTemplate().send(errorChannel, buildErrorMessage(message, exception)); + } + else { + throw ex; } } - - sendMessageIfAny(message, records); + return message; } @Override diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaMessageDrivenChannelAdapterParserTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaMessageDrivenChannelAdapterParserTests.java index de125b4639e..43a12d74aa7 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaMessageDrivenChannelAdapterParserTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaMessageDrivenChannelAdapterParserTests.java @@ -109,7 +109,7 @@ void testKafkaBatchMessageDrivenChannelAdapterParser() { } @Test - @SuppressWarnings({ "unchecked", "deprecation" }) + @SuppressWarnings("unchecked") void testKafkaMessageDrivenChannelAdapterOptions() { DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(Collections.emptyMap()); @@ -133,17 +133,14 @@ void testKafkaMessageDrivenChannelAdapterOptions() { adapter.setRecordFilterStrategy(null); adapter.setRetryTemplate(new RetryTemplate()); + container.getContainerProperties().setMessageListener(null); adapter.afterPropertiesSet(); messageListener = containerProps.getMessageListener(); - assertThat(messageListener).isInstanceOf( - org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.class); - - delegate = TestUtils.getPropertyValue(messageListener, "delegate"); - - assertThat(delegate.getClass().getName()).contains("$IntegrationRecordMessageListener"); + assertThat(messageListener.getClass().getName()).contains("$IntegrationRecordMessageListener"); adapter.setRecordFilterStrategy(mock(RecordFilterStrategy.class)); + container.getContainerProperties().setMessageListener(null); adapter.afterPropertiesSet(); messageListener = containerProps.getMessageListener(); @@ -151,19 +148,16 @@ void testKafkaMessageDrivenChannelAdapterOptions() { delegate = TestUtils.getPropertyValue(messageListener, "delegate"); - assertThat(delegate).isInstanceOf( - org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.class); + assertThat(delegate.getClass().getName()).contains("$IntegrationRecordMessageListener"); adapter.setFilterInRetry(true); + container.getContainerProperties().setMessageListener(null); adapter.afterPropertiesSet(); messageListener = containerProps.getMessageListener(); - assertThat(messageListener).isInstanceOf( - org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.class); - - delegate = TestUtils.getPropertyValue(messageListener, "delegate"); + assertThat(messageListener.getClass().getName()).contains("$IntegrationRecordMessageListener"); - assertThat(delegate).isInstanceOf(FilteringMessageListenerAdapter.class); + assertThat(adapter).extracting("doFilterInRetry").isEqualTo(Boolean.TRUE); } } diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java index adeac2a2b0b..e327e6744a9 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java @@ -45,13 +45,13 @@ import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.Pollers; -import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer; import org.springframework.integration.kafka.channel.PollableKafkaChannel; +import org.springframework.integration.kafka.inbound.KafkaErrorSendingMessageRecoverer; +import org.springframework.integration.kafka.inbound.KafkaInboundGateway; import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter; import org.springframework.integration.kafka.inbound.KafkaMessageSource; import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; import org.springframework.integration.kafka.support.KafkaIntegrationHeaders; -import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy; import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.test.util.TestUtils; import org.springframework.kafka.annotation.EnableKafka; @@ -61,9 +61,11 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.ConsumerProperties; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties.AckMode; +import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.listener.GenericMessageListenerContainer; import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.kafka.listener.MessageListenerContainer; @@ -83,6 +85,7 @@ import org.springframework.retry.support.RetryTemplate; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.backoff.FixedBackOff; /** * @author Artem Bilan @@ -163,6 +166,15 @@ public class KafkaDslTests { @Autowired private Gate gate; + @Autowired + private KafkaInboundGateway server; + + @Autowired + private CommonErrorHandler eh; + + @Autowired + private QueueChannel recoveringErrorChannel; + @Test void testKafkaAdapters() throws Exception { this.sendToKafkaFlowInput.send(new GenericMessage<>("foo", Collections.singletonMap("foo", "bar"))); @@ -234,6 +246,12 @@ void testKafkaAdapters() throws Exception { void testGateways() throws Exception { assertThat(this.config.replyContainerLatch.await(30, TimeUnit.SECONDS)).isTrue(); assertThat(this.gate.exchange(TEST_TOPIC4, "foo")).isEqualTo("FOO"); + assertThat(this.server).extracting("messageListenerContainer") + .extracting("commonErrorHandler") + .isSameAs(this.eh); + Message received = this.recoveringErrorChannel.receive(10_000); + assertThat(received).isNotNull(); + assertThat(received.getHeaders().get(KafkaHeaders.RAW_DATA)).isNotNull(); } @Test @@ -290,10 +308,8 @@ public IntegrationFlow topic1ListenerFromKafkaFlow() { c.ackMode(ContainerProperties.AckMode.MANUAL) .idleEventInterval(100L) .id("topic1ListenerContainer")) - .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(), - new RawRecordHeaderErrorMessageStrategy())) + .errorChannel(errorChannel()) .retryTemplate(new RetryTemplate()) - .filterInRetry(true) .onPartitionsAssignedSeekCallback((map, callback) -> ContextConfiguration.this.onPartitionsAssignedCalledLatch.countDown())) .filter(Message.class, m -> @@ -304,16 +320,12 @@ public IntegrationFlow topic1ListenerFromKafkaFlow() { .get(); } - @SuppressWarnings("deprecation") @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setAckMode(AckMode.MANUAL); - factory.setRecoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(), - new RawRecordHeaderErrorMessageStrategy())); - factory.setRetryTemplate(new RetryTemplate()); return factory; } @@ -322,8 +334,7 @@ public IntegrationFlow topic2ListenerFromKafkaFlow() { return IntegrationFlows .from(Kafka .messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2), - KafkaMessageDrivenChannelAdapter.ListenerMode.record) - .filterInRetry(true)) + KafkaMessageDrivenChannelAdapter.ListenerMode.record)) .filter(Message.class, m -> m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101, f -> f.throwExceptionOnRejection(true)) @@ -469,11 +480,46 @@ public void onPartitionsAssigned(Collection partitions) { public IntegrationFlow serverGateway() { return IntegrationFlows .from(Kafka.inboundGateway(consumerFactory(), containerProperties(), - producerFactory())) + producerFactory()) + .configureListenerContainer(container -> container.errorHandler(eh()))) .transform(String::toUpperCase) .get(); } + @Bean + CommonErrorHandler eh() { + return new DefaultErrorHandler(); + } + + @Bean + IntegrationFlow withRecoveringErrorHandler() { + ContainerProperties props = containerProperties(); + props.setGroupId("wreh"); + return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(consumerFactory(), props) + .configureListenerContainer(container -> { + container.errorHandler(recoveringErrorHandler()); + })) + .handle(p -> { + throw new RuntimeException("test"); + }) + .get(); + } + + @Bean + CommonErrorHandler recoveringErrorHandler() { + return new DefaultErrorHandler(recoverer(), new FixedBackOff(0L, 0L)); + } + + @Bean + KafkaErrorSendingMessageRecoverer recoverer() { + return new KafkaErrorSendingMessageRecoverer(recoveringErrorChannel()); + } + + @Bean + QueueChannel recoveringErrorChannel() { + return new QueueChannel(); + } + private ContainerProperties containerProperties() { ContainerProperties containerProperties = new ContainerProperties(TEST_TOPIC4); containerProperties.setGroupId("inGateGroup"); diff --git a/spring-integration-kafka/src/test/kotlin/org/springframework/integration/kafka/dsl/kotlin/KafkaDslKotlinTests.kt b/spring-integration-kafka/src/test/kotlin/org/springframework/integration/kafka/dsl/kotlin/KafkaDslKotlinTests.kt index a6843b7a9d7..f1f9ddb25fc 100644 --- a/spring-integration-kafka/src/test/kotlin/org/springframework/integration/kafka/dsl/kotlin/KafkaDslKotlinTests.kt +++ b/spring-integration-kafka/src/test/kotlin/org/springframework/integration/kafka/dsl/kotlin/KafkaDslKotlinTests.kt @@ -36,11 +36,9 @@ import org.springframework.integration.channel.QueueChannel import org.springframework.integration.config.EnableIntegration import org.springframework.integration.dsl.Pollers import org.springframework.integration.dsl.integrationFlow -import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer import org.springframework.integration.kafka.dsl.Kafka import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler -import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy import org.springframework.integration.support.MessageBuilder import org.springframework.integration.test.util.TestUtils import org.springframework.kafka.annotation.EnableKafka @@ -239,8 +237,7 @@ class KafkaDslKotlinTests { it.ackMode(ContainerProperties.AckMode.MANUAL) .id("topic1ListenerContainer") } - .recoveryCallback(ErrorMessageSendingRecoverer(errorChannel(), - RawRecordHeaderErrorMessageStrategy())) + .errorChannel(errorChannel()) .retryTemplate(RetryTemplate()) .filterInRetry(true)) { filter>({ m -> (m.headers[KafkaHeaders.RECEIVED_MESSAGE_KEY] as Int) < 101 }) { throwExceptionOnRejection(true) } @@ -254,8 +251,7 @@ class KafkaDslKotlinTests { Kafka.messageDrivenChannelAdapter(consumerFactory(), KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC2) .configureListenerContainer { it.ackMode(ContainerProperties.AckMode.MANUAL) } - .recoveryCallback(ErrorMessageSendingRecoverer(errorChannel(), - RawRecordHeaderErrorMessageStrategy())) + .errorChannel(errorChannel()) .retryTemplate(RetryTemplate()) .filterInRetry(true)) { filter>({ m -> (m.headers[KafkaHeaders.RECEIVED_MESSAGE_KEY] as Int) < 101 }) { throwExceptionOnRejection(true) } diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index bd993dfae83..110acbfb368 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -93,46 +93,13 @@ The `KafkaProducerMessageHandler.sendTimeoutExpression` default has changed from This has been changed for consistency because you may get unexpected behavior (Spring may timeout the send, while it is actually, eventually, successful). IMPORTANT: That timeout is 120 seconds by default so you may wish to reduce it to get more timely failures. -==== Java Configuration +==== Configuration -The following example shows how to configure the outbound channel adapter for Apache Kafka with Java: +The following example shows how to configure the outbound channel adapter for Apache Kafka: ==== -[source, java] ----- -@Bean -@ServiceActivator(inputChannel = "toKafka") -public MessageHandler handler() throws Exception { - KafkaProducerMessageHandler handler = - new KafkaProducerMessageHandler<>(kafkaTemplate()); - handler.setTopicExpression(new LiteralExpression("someTopic")); - handler.setMessageKeyExpression(new LiteralExpression("someKey")); - handler.setSuccessChannel(successes()); - handler.setFailureChannel(failures()); - return handler; -} - -@Bean -public KafkaTemplate kafkaTemplate() { - return new KafkaTemplate<>(producerFactory()); -} - -@Bean -public ProducerFactory producerFactory() { - Map props = new HashMap<>(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); - // set more properties - return new DefaultKafkaProducerFactory<>(props); -} ----- -==== - -==== Java DSL Configuration - -The following example shows how to configure the outbound channel adapter for Apache Kafka with Spring Integration Java DSL: - -==== -[source, java] +[source, java, role="primary"] +.Java DSL ---- @Bean public ProducerFactory producerFactory() { @@ -173,14 +140,36 @@ private KafkaProducerMessageHandlerSpec kafkaMessageHandler( .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic)); } ---- -==== - -==== XML Configuration +[source, java, role="secondary"] +.Java +---- +@Bean +@ServiceActivator(inputChannel = "toKafka") +public MessageHandler handler() throws Exception { + KafkaProducerMessageHandler handler = + new KafkaProducerMessageHandler<>(kafkaTemplate()); + handler.setTopicExpression(new LiteralExpression("someTopic")); + handler.setMessageKeyExpression(new LiteralExpression("someKey")); + handler.setSuccessChannel(successes()); + handler.setFailureChannel(failures()); + return handler; +} -The following example shows how to configure the Kafka outbound channel adapter with XML: +@Bean +public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); +} -==== -[source, xml] +@Bean +public ProducerFactory producerFactory() { + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); + // set more properties + return new DefaultKafkaProducerFactory<>(props); +} +---- +[source, xml, role="secondary"] +.XML ---- - adapter(KafkaMessageListenerContainer container) { - KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = - new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record); - kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); - return kafkaMessageDrivenChannelAdapter; -} - -@Bean -public KafkaMessageListenerContainer container() throws Exception { - ContainerProperties properties = new ContainerProperties(this.topic); - // set more properties - return new KafkaMessageListenerContainer<>(consumerFactory(), properties); -} - -@Bean -public ConsumerFactory consumerFactory() { - Map props = new HashMap<>(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); - // set more properties - return new DefaultKafkaConsumerFactory<>(props); -} ----- -==== +IMPORTANT: This form of retry is blocking and could cause a rebalance if the aggregate retry delays across all polled records might exceed the `{@code `max.poll.interval.ms` consumer property. +Instead, consider adding a `DefaultErrorHandler` to the listener container, configured with a `KafkaErrorSendingMessageRecoverer`. -==== Java DSL Configuration +==== Configuration -The following example shows how to configure a message-driven channel adapter with the Spring Integration Java DSL: +The following example shows how to configure a message-driven channel adapter: ==== -[source, java] +[source, java, role="primary"] +.Java DSL ---- @Bean public IntegrationFlow topic1ListenerFromKafkaFlow() { @@ -296,40 +255,35 @@ public IntegrationFlow topic1ListenerFromKafkaFlow() { .get(); } ---- -==== - -You can also use the container factory that is used for `@KafkaListener` annotations to create `ConcurrentMessageListenerContainer` instances for other purposes. -See https://docs.spring.io/spring-kafka/docs/current/reference/html/[the Spring for Apache Kafka documentation] for an example. - -With the Java DSL, the container does not have to be configured as a `@Bean`, because the DSL registers the container as a bean. -The following example shows how to do so: - -==== -[source, java] +[source, java, role="secondary"] +.Java ---- @Bean -public IntegrationFlow topic2ListenerFromKafkaFlow() { - return IntegrationFlows - .from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2), - KafkaMessageDrivenChannelAdapter.ListenerMode.record) - .id("topic2Adapter")) - ... - get(); +public KafkaMessageDrivenChannelAdapter + adapter(KafkaMessageListenerContainer container) { + KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = + new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record); + kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); + return kafkaMessageDrivenChannelAdapter; } ----- -==== - -Notice that, in this case, the adapter is given an `id` (`topic2Adapter`). -The container is registered in the application context with a name of `topic2Adapter.container`. -If the adapter does not have an `id` property, the container's bean name is the container's fully qualified class name plus `#n`, where `n` is incremented for each container. -==== XML Configuration - - -The following example shows how to configure a message-driven channel adapter with XML: +@Bean +public KafkaMessageListenerContainer container() throws Exception { + ContainerProperties properties = new ContainerProperties(this.topic); + // set more properties + return new KafkaMessageListenerContainer<>(consumerFactory(), properties); +} -==== -[source, xml] +@Bean +public ConsumerFactory consumerFactory() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); + // set more properties + return new DefaultKafkaConsumerFactory<>(props); +} +---- +[source, xml, role="secondary"] +.XML ---- source(ConsumerFactory cf) { - KafkaMessageSource source = new KafkaMessageSource<>(cf, "myTopic"); - source.setGroupId("myGroupId"); - source.setClientId("myClientId"); - return source; +public IntegrationFlow topic2ListenerFromKafkaFlow() { + return IntegrationFlows + .from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2), + KafkaMessageDrivenChannelAdapter.ListenerMode.record) + .id("topic2Adapter")) + ... + get(); } ---- ==== -Refer to the javadocs for available properties. - -[[max-poll-records]] -By default, `max.poll.records` must be either explicitly set in the consumer factory, or it will be forced to 1 if the consumer factory is a `DefaultKafkaConsumerFactory`. -You can set the property `allowMultiFetch` to `true` to override this behavior. +Notice that, in this case, the adapter is given an `id` (`topic2Adapter`). +The container is registered in the application context with a name of `topic2Adapter.container`. +If the adapter does not have an `id` property, the container's bean name is the container's fully qualified class name plus `#n`, where `n` is incremented for each container. -IMPORTANT: You must poll the consumer within `max.poll.interval.ms` to avoid a rebalance. -If you set `allowMultiFetch` to `true` you must process all the retrieved records, and poll again, within `max.poll.interval.ms`. +[[kafka-inbound-pollable]] +=== Inbound Channel Adapter -Messages emitted by this adapter contain a header `kafka_remainingRecords` with a count of records remaining from the previous poll. +The `KafkaMessageSource` provides a pollable channel adapter implementation. -==== Java DSL Configuration +==== Configuration ==== -[source, java] +[source, java, role="primary"] +.Java DSL ---- @Bean public IntegrationFlow flow(ConsumerFactory cf) { @@ -410,12 +363,20 @@ public IntegrationFlow flow(ConsumerFactory cf) { .get(); } ---- -==== - -==== XML Configuration - -==== -[source, xml] +[source, java, role="secondary"] +.Java +---- +@InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000")) +@Bean +public KafkaMessageSource source(ConsumerFactory cf) { + KafkaMessageSource source = new KafkaMessageSource<>(cf, "myTopic"); + source.setGroupId("myGroupId"); + source.setClientId("myClientId"); + return source; +} +---- +[source, xml, role="secondary"] +.XML ---- cf) { ---- ==== +Refer to the javadocs for available properties. + +[[max-poll-records]] +By default, `max.poll.records` must be either explicitly set in the consumer factory, or it will be forced to 1 if the consumer factory is a `DefaultKafkaConsumerFactory`. +You can set the property `allowMultiFetch` to `true` to override this behavior. + +IMPORTANT: You must poll the consumer within `max.poll.interval.ms` to avoid a rebalance. +If you set `allowMultiFetch` to `true` you must process all the retrieved records, and poll again, within `max.poll.interval.ms`. + +Messages emitted by this adapter contain a header `kafka_remainingRecords` with a count of records remaining from the previous poll. + [[kafka-outbound-gateway]] === Outbound Gateway @@ -449,12 +421,26 @@ The `KafkaProducerMessageHandler` `sendTimeoutExpression` default is `delivery.t This has been changed for consistency because you may get unexpected behavior (Spring may timeout the `send()`, while it is actually, eventually, successful). IMPORTANT: That timeout is 120 seconds by default so you may wish to reduce it to get more timely failures. -==== Java Configuration +==== Configuration -The following example shows how to configure a gateway with Java: +The following example shows how to configure a gateway: ==== -[source, java] +[source, java, role="primary"] +.Java DSL +---- +@Bean +public IntegrationFlow outboundGateFlow( + ReplyingKafkaTemplate kafkaTemplate) { + + return IntegrationFlows.from("kafkaRequests") + .handle(Kafka.outboundGateway(kafkaTemplate)) + .channel("kafkaReplies") + .get(); +} +---- +[source, java, role="secondary"] +.Java ---- @Bean @ServiceActivator(inputChannel = "kafkaRequests", outputChannel = "kafkaReplies") @@ -463,6 +449,27 @@ public KafkaProducerMessageHandler outGateway( return new KafkaProducerMessageHandler<>(kafkaTemplate); } ---- +[source, xml, role="secondary"] +.XML +---- + +---- ==== Refer to the javadocs for available properties. @@ -479,25 +486,6 @@ The reply topic is determined as follows: You can also specify a `KafkaHeaders.REPLY_PARTITION` header to determine a specific partition to be used for replies. Again, this is validated against the template's reply container's subscriptions. -==== Java DSL Configuration - -The following example shows how to configure an outbound gateway with the Java DSL: - -==== -[source, java] ----- -@Bean -public IntegrationFlow outboundGateFlow( - ReplyingKafkaTemplate kafkaTemplate) { - - return IntegrationFlows.from("kafkaRequests") - .handle(Kafka.outboundGateway(kafkaTemplate)) - .channel("kafkaReplies") - .get(); -} ----- -==== - Alternatively, you can also use a configuration similar to the following bean: ==== @@ -514,40 +502,32 @@ public IntegrationFlow outboundGateFlow() { ---- ==== -==== XML Configuration - -==== -[source, xml] ----- - ----- -==== - [[kafka-inbound-gateway]] === Inbound Gateway The inbound gateway is for request/reply operations. -The following example shows how to configure an inbound gateway with Java: +==== Configuration + +The following example shows how to configure an inbound gateway: ==== -[source, java] +[source, java, role="primary"] +.Java DSL +---- +@Bean +public IntegrationFlow serverGateway( + ConcurrentMessageListenerContainer container, + KafkaTemplate replyTemplate) { + return IntegrationFlows + .from(Kafka.inboundGateway(container, replyTemplate) + .replyTimeout(30_000)) + .transform(String::toUpperCase) + .get(); +} +---- +[source, java, role="secondary"] +.Java ---- @Bean public KafkaInboundGateway inboundGateway( @@ -562,27 +542,40 @@ public KafkaInboundGateway inboundGateway( return gateway; } ---- +[source, xml, role="secondary"] +.XML +---- + +---- ==== Refer to the javadocs for available properties. -The following example shows how to configure a simple upper case converter with the Java DSL: +When a `RetryTemplate` is provided, delivery failures are retried according to its retry policy. +If an `error-channel` is also supplied, a default `ErrorMessageSendingRecoverer` will be used as the recovery callback after retries are exhausted. +You can also use the `recovery-callback` to specify some other action to take in that case, or set it to `null` to throw the final exception to the listener container so it is handled there. -==== -[source, java] ----- -@Bean -public IntegrationFlow serverGateway( - ConcurrentMessageListenerContainer container, - KafkaTemplate replyTemplate) { - return IntegrationFlows - .from(Kafka.inboundGateway(container, replyTemplate) - .replyTimeout(30_000)) - .transform(String::toUpperCase) - .get(); -} ----- -==== +When building an `ErrorMessage` (for use in the `error-channel` or `recovery-callback`), you can customize the error message by setting the `error-message-strategy` property. +By default, a `RawRecordHeaderErrorMessageStrategy` is used, to provide access to the converted message as well as the raw `ConsumerRecord`. + +IMPORTANT: This form of retry is blocking and could cause a rebalance if the aggregate retry delays across all polled records might exceed the `{@code `max.poll.interval.ms` consumer property. +Instead, consider adding a `DefaultErrorHandler` to the listener container, configured with a `KafkaErrorSendingMessageRecoverer`. + +The following example shows how to configure a simple upper case converter with the Java DSL: Alternatively, you could configure an upper-case converter by using code similar to the following: @@ -604,31 +597,6 @@ public IntegrationFlow serverGateway() { You can also use the container factory that is used for `@KafkaListener` annotations to create `ConcurrentMessageListenerContainer` instances for other purposes. See https://docs.spring.io/spring-kafka/docs/current/reference/html/[the Spring for Apache Kafka documentation] and <> for examples. -==== XML Configuration - -==== -[source, xml] ----- - ----- -==== - -See the XML schema for a description of each property. - [[kafka-channels]] === Channels Backed by Apache Kafka Topics @@ -639,7 +607,8 @@ Each channel requires a `KafkaTemplate` for the sending side and either a listen ==== Java DSL Configuration ==== -[source, java] +[source, java, role="primary"] +.Java DSL ---- @Bean public IntegrationFlow flowWithSubscribable(KafkaTemplate template, @@ -686,12 +655,8 @@ public IntegrationFlow flowWithPollable(KafkaTemplate template, .get(); } ---- -==== - -==== Java Configuration - -==== -[source, java] +[source, java, role="secondary"] +.Java ---- /** * Channel for a single subscriber. @@ -732,12 +697,8 @@ PollableKafkaChannel pollable(KafkaTemplate template, return channel; } ---- -==== - -==== XML Configuration - -==== -[source, xml] +[source, xml, role="secondary"] +.XML ---- @@ -771,7 +732,7 @@ The following example shows how to do so in XML configuration: send-timeout="5000" channel="nullChannel" message-converter="messageConverter" - payload-type="com.example.Foo" + payload-type="com.example.Thing" error-channel="errorChannel" /> new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record); kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); kafkaMessageDrivenChannelAdapter.setMessageConverter(converter()); - kafkaMessageDrivenChannelAdapter.setPayloadType(Foo.class); + kafkaMessageDrivenChannelAdapter.setPayloadType(Thing.class); return kafkaMessageDrivenChannelAdapter; } ---- diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 4b6e550af86..57628155758 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -40,3 +40,11 @@ See <<./http.adoc#http,HTTP Support>> for more information. The `spring-integration-rmi` module has been removed altogether after being deprecated in previous versions. There is no replacement: it is recommended to migrate to more secure network and application protocols, such as WebSockets, RSockets, gRPC or REST. + +=== Apache Kafka Changes + +When providing a `RetryTemplate` on the inbound gateway or message-driven channel adapter, if an `errorChannel` is also provided, an `ErrorMessageSendingRecoverer` is automatically configured. + +In addition, the new `KafkaErrorMessageSendingRecoverer` is provided; this can be used with a `DefaultErrorHandler` to avoid issues with long aggregated retry delays causing partitions rebalances. + +See <<./kafka.adoc#kafka,Spring for Apache Kafka Support>> for more information.