Skip to content

GH-3661: Resolve Spring Apache Kafka Deprecations #3751

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.springframework.retry.support.RetryTemplate;

/**
* Implementations of this interface will generally support a retry template for retrying
* incoming deliveries and this supports adding common attributes to the retry context.
*
* @author Gary Russell
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any general JavaDoc, please?

* @since 6.0
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,8 @@ protected void onInit() {
}
ContainerProperties containerProperties = this.messageListenerContainer.getContainerProperties();
Object existing = containerProperties.getMessageListener();
if (existing != null) {
logger.warn(() -> "Container's existing message listener ("
+ existing
+ ") replaced by this endpoint");
}
Assert.state(existing == null, () -> "listener container cannot have an existing message listener (" + existing
+ ")");
containerProperties.setMessageListener(this.listener);
this.containerDeliveryAttemptPresent = containerProperties.isDeliveryAttemptHeader();
}
Expand Down Expand Up @@ -252,9 +249,11 @@ public int afterShutdown() {
* attributes for use by the {@link org.springframework.integration.support.ErrorMessageStrategy}.
* @param record the record.
* @param message the message.
* @param conversionError a conversion error occurred.
*/
private void setAttributesIfNecessary(Object record, Message<?> message) {
boolean needHolder = getErrorChannel() != null && this.retryTemplate == null;
private void setAttributesIfNecessary(Object record, @Nullable Message<?> message, boolean conversionError) {
boolean needHolder = ATTRIBUTES_HOLDER.get() == null
&& (getErrorChannel() != null && (this.retryTemplate == null || conversionError));
boolean needAttributes = needHolder | this.retryTemplate != null;
if (needHolder) {
ATTRIBUTES_HOLDER.set(ErrorMessageUtils.getAttributeAccessor(null, null));
Expand Down Expand Up @@ -299,14 +298,17 @@ public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment
try {
message = toMessagingMessage(record, acknowledgment, consumer);
}
catch (RuntimeException e) {
catch (RuntimeException ex) {
if (KafkaInboundGateway.this.retryTemplate == null) {
message = enhanceHeaders(message, record);
setAttributesIfNecessary(record, null, true);
}
MessageChannel errorChannel = getErrorChannel();
if (errorChannel != null) {
KafkaInboundGateway.this.messagingTemplate.send(errorChannel, buildErrorMessage(null,
new ConversionException("Failed to convert to message", record, e)));
new ConversionException("Failed to convert to message", record, ex)));
}
else {
throw ex;
}
}
if (message != null) {
Expand All @@ -324,39 +326,11 @@ private void sendAndReceive(ConsumerRecord<K, V> record, Message<?> message, Ack
if (template != null) {
doWithRetry(template, KafkaInboundGateway.this.recoveryCallback, record, acknowledgment, consumer,
() -> {
Message<?> toSend = enhanceHeaders(message, record, acknowledgment, consumer);
if (toSend != null) {
doSendAndReceive(message);
}
doSendAndReceive(enhanceHeadersAndSaveAttributes(message, record));
});
}
else {
Message<?> toSend = enhanceHeaders(message, record, acknowledgment, consumer);
if (toSend != null) {
doSendAndReceive(toSend);
}
}
}

@Nullable
private Message<?> enhanceHeaders(Message<?> message, ConsumerRecord<K, V> record,
Acknowledgment acknowledgment, Consumer<?, ?> consumer) {

try {
Message<?> message2 = enhanceHeaders(message, record);
setAttributesIfNecessary(record, message2);
return message2;
}
catch (RuntimeException ex) {
MessageChannel errorChannel = getErrorChannel();
if (errorChannel != null) {
KafkaInboundGateway.this.messagingTemplate.send(errorChannel, buildErrorMessage(null,
new ConversionException("Failed to convert to message", record, ex)));
return null;
}
else {
throw ex;
}
doSendAndReceive(enhanceHeadersAndSaveAttributes(message, record));
}
}

Expand All @@ -367,6 +341,9 @@ private void doSendAndReceive(Message<?> message) {
reply = enhanceReply(message, reply);
KafkaInboundGateway.this.kafkaTemplate.send(reply);
}
else {
this.logger.debug(() -> "No reply received for " + message);
}
}
finally {
if (KafkaInboundGateway.this.retryTemplate == null) {
Expand All @@ -375,7 +352,7 @@ private void doSendAndReceive(Message<?> message) {
}
}

private Message<?> enhanceHeaders(Message<?> message, ConsumerRecord<K, V> record) {
private Message<?> enhanceHeadersAndSaveAttributes(Message<?> message, ConsumerRecord<K, V> record) {
Message<?> messageToReturn = message;
if (message.getHeaders() instanceof KafkaMessageHeaders) {
Map<String, Object> rawHeaders = ((KafkaMessageHeaders) message.getHeaders()).getRawHeaders();
Expand Down Expand Up @@ -410,6 +387,7 @@ else if (KafkaInboundGateway.this.containerDeliveryAttemptPresent) {
}
messageToReturn = builder.build();
}
setAttributesIfNecessary(record, messageToReturn, false);
return messageToReturn;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,8 @@ protected void onInit() {

ContainerProperties containerProperties = this.messageListenerContainer.getContainerProperties();
Object existing = containerProperties.getMessageListener();
if (existing != null) {
logger.warn(() -> "Container's existing message listener ("
+ existing
+ ") replaced by this endpoint");
}
Assert.state(existing == null, () -> "listener container cannot have an existing message listener (" + existing
+ ")");
if (this.mode.equals(ListenerMode.record)) {
MessageListener<K, V> listener = this.recordListener;

Expand Down Expand Up @@ -360,9 +357,11 @@ public int afterShutdown() {
* attributes for use by the {@link org.springframework.integration.support.ErrorMessageStrategy}.
* @param record the record.
* @param message the message.
* @param conversionError a conversion error occurred.
*/
private void setAttributesIfNecessary(Object record, Message<?> message) {
boolean needHolder = getErrorChannel() != null && this.retryTemplate == null;
private void setAttributesIfNecessary(Object record, @Nullable Message<?> message, boolean conversionError) {
boolean needHolder = ATTRIBUTES_HOLDER.get() == null
&& (getErrorChannel() != null && (this.retryTemplate == null || conversionError));
boolean needAttributes = needHolder | this.retryTemplate != null;
if (needHolder) {
ATTRIBUTES_HOLDER.set(ErrorMessageUtils.getAttributeAccessor(null, null));
Expand Down Expand Up @@ -438,19 +437,35 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
@Override
public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {

Message<?> message = null;
try {
message = toMessagingMessage(record, acknowledgment, consumer);
}
catch (RuntimeException ex) {
if (KafkaMessageDrivenChannelAdapter.this.retryTemplate == null) {
setAttributesIfNecessary(record, null, true);
}
MessageChannel errorChannel = getErrorChannel();
if (errorChannel != null) {
RuntimeException exception = new ConversionException("Failed to convert to message", record, ex);
sendErrorMessageIfNecessary(null, exception);
}
else {
throw ex;
}
}
RetryTemplate template = KafkaMessageDrivenChannelAdapter.this.retryTemplate;
if (template != null) {
Message<?> toSend = message;
doWithRetry(template, KafkaMessageDrivenChannelAdapter.this.recoveryCallback, record, acknowledgment,
consumer, () -> {
Message<?> message = enhanceHeaders(record, acknowledgment, consumer);
if (!KafkaMessageDrivenChannelAdapter.this.filterInRetry || passesFilter(record)) {
sendMessageIfAny(message, record);
sendMessageIfAny(enhanceHeadersAndSaveAttributes(toSend, record), record);
}
});
}
else {
Message<?> message = enhanceHeaders(record, acknowledgment, consumer);
sendMessageIfAny(message, record);
sendMessageIfAny(enhanceHeadersAndSaveAttributes(message, record), record);
}
}

Expand All @@ -459,22 +474,7 @@ private boolean passesFilter(ConsumerRecord<K, V> record) {
return filter == null || !filter.filter(record);
}

private Message<?> enhanceHeaders(ConsumerRecord<K, V> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {

Message<?> message = null;
try {
message = enhanceHeaders(toMessagingMessage(record, acknowledgment, consumer), record);
setAttributesIfNecessary(record, message);
}
catch (RuntimeException e) {
RuntimeException exception = new ConversionException("Failed to convert to message", record, e);
sendErrorMessageIfNecessary(null, exception);
}
return message;
}

private Message<?> enhanceHeaders(Message<?> message, ConsumerRecord<K, V> record) {
private Message<?> enhanceHeadersAndSaveAttributes(Message<?> message, ConsumerRecord<K, V> record) {
Message<?> messageToReturn = message;
if (message.getHeaders() instanceof KafkaMessageHeaders) {
Map<String, Object> rawHeaders = ((KafkaMessageHeaders) message.getHeaders()).getRawHeaders();
Expand Down Expand Up @@ -509,6 +509,7 @@ else if (KafkaMessageDrivenChannelAdapter.this.containerDeliveryAttemptPresent)
}
messageToReturn = builder.build();
}
setAttributesIfNecessary(record, messageToReturn, false);
return messageToReturn;
}

Expand Down Expand Up @@ -592,7 +593,7 @@ private Message<?> toMessage(List<ConsumerRecord<K, V>> records, Acknowledgment
Message<?> message = null;
try {
message = toMessagingMessage(records, acknowledgment, consumer);
setAttributesIfNecessary(records, message);
setAttributesIfNecessary(records, message, false);
}
catch (RuntimeException ex) {
Exception exception = new ConversionException("Failed to convert to message",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,14 @@ void testKafkaMessageDrivenChannelAdapterOptions() {

adapter.setRecordFilterStrategy(null);
adapter.setRetryTemplate(new RetryTemplate());
container.getContainerProperties().setMessageListener(null);
adapter.afterPropertiesSet();

messageListener = containerProps.getMessageListener();
assertThat(messageListener.getClass().getName()).contains("$IntegrationRecordMessageListener");

adapter.setRecordFilterStrategy(mock(RecordFilterStrategy.class));
container.getContainerProperties().setMessageListener(null);
adapter.afterPropertiesSet();

messageListener = containerProps.getMessageListener();
Expand All @@ -149,6 +151,7 @@ void testKafkaMessageDrivenChannelAdapterOptions() {
assertThat(delegate.getClass().getName()).contains("$IntegrationRecordMessageListener");

adapter.setFilterInRetry(true);
container.getContainerProperties().setMessageListener(null);
adapter.afterPropertiesSet();

messageListener = containerProps.getMessageListener();
Expand Down
4 changes: 2 additions & 2 deletions src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ If you hand off the message to another thread, you must not call its methods.

When a `retry-template` is provided, delivery failures are retried according to its retry policy.
If an `error-channel` is also supplied, a default `ErrorMessageSendingRecoverer` will be used as the recovery callback after retries are exhausted.
You can also use the `recovery-callback` to specify some other action to take in that case.
You can also use the `recovery-callback` to specify some other action to take in that case, or set it to `null` to throw the final exception to the listener container so it is handled there.

When building an `ErrorMessage` (for use in the `error-channel` or `recovery-callback`), you can customize the error message by setting the `error-message-strategy` property.
By default, a `RawRecordHeaderErrorMessageStrategy` is used, to provide access to the converted message as well as the raw `ConsumerRecord`.
Expand Down Expand Up @@ -567,7 +567,7 @@ Refer to the javadocs for available properties.

When a `RetryTemplate` is provided, delivery failures are retried according to its retry policy.
If an `error-channel` is also supplied, a default `ErrorMessageSendingRecoverer` will be used as the recovery callback after retries are exhausted.
You can also use the `recovery-callback` to specify some other action to take in that case.
You can also use the `recovery-callback` to specify some other action to take in that case, or set it to `null` to throw the final exception to the listener container so it is handled there.

When building an `ErrorMessage` (for use in the `error-channel` or `recovery-callback`), you can customize the error message by setting the `error-message-strategy` property.
By default, a `RawRecordHeaderErrorMessageStrategy` is used, to provide access to the converted message as well as the raw `ConsumerRecord`.
Expand Down
8 changes: 8 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,11 @@ See <<./http.adoc#http,HTTP Support>> for more information.

The `spring-integration-rmi` module has been removed altogether after being deprecated in previous versions.
There is no replacement: it is recommended to migrate to more secure network and application protocols, such as WebSockets, RSockets, gRPC or REST.

=== Apache Kafka Changes

When providing a `RetryTemplate` on the inbound gateway or message-driven channel adapter, if an `errorChannel` is also provided, an `ErrorMessageSendingRecoverer` is automatically configured.

In addition, the new `KafkaErrorMessageSendingRecoverer` is provided; this can be used with a `DefaultErrorHandler` to avoid issues with long aggregated retry delays causing partitions rebalances.

See <<>> and <<>> for more information.