Skip to content

Commit 5a27bac

Browse files
spring-projectsGH-3050: Delegating EH delegates compatibility
Fixes: spring-projects#3050 Correct CommonDelegatingErrorHandler validation for delegates compatibility. Add documentation stating that delegates must be compatible with default error handler.
1 parent f25b46f commit 5a27bac

File tree

3 files changed

+51
-4
lines changed

3 files changed

+51
-4
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,8 @@ This is to cause the transaction to roll back (if transactions are enabled).
351351
The `CommonDelegatingErrorHandler` can delegate to different error handlers, depending on the exception type.
352352
For example, you may wish to invoke a `DefaultErrorHandler` for most exceptions, or a `CommonContainerStoppingErrorHandler` for others.
353353

354+
All delegates must share the same compatible properties (`ackAfterHandle`, `seekAfterError` ...).
355+
354356
[[log-eh]]
355357
== Logging Error Handler
356358

spring-kafka/src/main/java/org/springframework/kafka/listener/CommonDelegatingErrorHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2023 the original author or authors.
2+
* Copyright 2021-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -65,6 +65,7 @@ public CommonDelegatingErrorHandler(CommonErrorHandler defaultErrorHandler) {
6565
* Set the delegate error handlers; a {@link LinkedHashMap} argument is recommended so
6666
* that the delegates are searched in a known order.
6767
* @param delegates the delegates.
68+
* @throws IllegalArgumentException if any of the delegates is not compatible with the default error handler.
6869
*/
6970
public void setErrorHandlers(Map<Class<? extends Throwable>, CommonErrorHandler> delegates) {
7071
Assert.notNull(delegates, "'delegates' cannot be null");
@@ -109,6 +110,7 @@ public void setAckAfterHandle(boolean ack) {
109110
* Add a delegate to the end of the current collection.
110111
* @param throwable the throwable for this handler.
111112
* @param handler the handler.
113+
* @throws IllegalArgumentException if the handler is not compatible with the default error handler.
112114
*/
113115
public void addDelegate(Class<? extends Throwable> throwable, CommonErrorHandler handler) {
114116
Map<Class<? extends Throwable>, CommonErrorHandler> delegatesToCheck = new LinkedHashMap<>(this.delegates);
@@ -118,13 +120,12 @@ public void addDelegate(Class<? extends Throwable> throwable, CommonErrorHandler
118120
this.delegates.putAll(delegatesToCheck);
119121
}
120122

121-
@SuppressWarnings("deprecation")
122123
private void checkDelegatesAndUpdateClassifier(Map<Class<? extends Throwable>,
123124
CommonErrorHandler> delegatesToCheck) {
124125

125126
boolean ackAfterHandle = this.defaultErrorHandler.isAckAfterHandle();
126127
boolean seeksAfterHandling = this.defaultErrorHandler.seeksAfterHandling();
127-
this.delegates.values().forEach(handler -> {
128+
delegatesToCheck.values().forEach(handler -> {
128129
Assert.isTrue(ackAfterHandle == handler.isAckAfterHandle(),
129130
"All delegates must return the same value when calling 'isAckAfterHandle()'");
130131
Assert.isTrue(seeksAfterHandling == handler.seeksAfterHandling(),

spring-kafka/src/test/java/org/springframework/kafka/listener/CommonDelegatingErrorHandlerTests.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2022 the original author or authors.
2+
* Copyright 2021-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,7 +17,9 @@
1717
package org.springframework.kafka.listener;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2021
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.BDDMockito.given;
2123
import static org.mockito.Mockito.mock;
2224
import static org.mockito.Mockito.never;
2325
import static org.mockito.Mockito.verify;
@@ -173,6 +175,48 @@ void testDefaultDelegateIsApplied() {
173175
verify(defaultHandler).handleRemaining(any(), any(), any(), any());
174176
}
175177

178+
@Test
179+
void testAddIncompatibleAckAfterHandleDelegate() {
180+
var defaultHandler = mock(CommonErrorHandler.class);
181+
given(defaultHandler.isAckAfterHandle()).willReturn(true);
182+
var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler);
183+
var delegate = mock(CommonErrorHandler.class);
184+
given(delegate.isAckAfterHandle()).willReturn(false);
185+
186+
assertThatThrownBy(() -> delegatingErrorHandler.addDelegate(IllegalStateException.class, delegate))
187+
.isInstanceOf(IllegalArgumentException.class)
188+
.hasMessage("All delegates must return the same value when calling 'isAckAfterHandle()'");
189+
}
190+
191+
@Test
192+
void testAddIncompatibleSeeksAfterHandlingDelegate() {
193+
var defaultHandler = mock(CommonErrorHandler.class);
194+
given(defaultHandler.seeksAfterHandling()).willReturn(true);
195+
var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler);
196+
var delegate = mock(CommonErrorHandler.class);
197+
given(delegate.seeksAfterHandling()).willReturn(false);
198+
199+
assertThatThrownBy(() -> delegatingErrorHandler.addDelegate(IllegalStateException.class, delegate))
200+
.isInstanceOf(IllegalArgumentException.class)
201+
.hasMessage("All delegates must return the same value when calling 'seeksAfterHandling()'");
202+
}
203+
204+
@Test
205+
void testAddMultipleDelegatesWithOneIncompatible() {
206+
var defaultHandler = mock(CommonErrorHandler.class);
207+
given(defaultHandler.seeksAfterHandling()).willReturn(true);
208+
var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler);
209+
var one = mock(CommonErrorHandler.class);
210+
given(one.seeksAfterHandling()).willReturn(true);
211+
var two = mock(CommonErrorHandler.class);
212+
given(one.seeksAfterHandling()).willReturn(false);
213+
Map<Class<? extends Throwable>, CommonErrorHandler> delegates = Map.of(IllegalStateException.class, one, IOException.class, two);
214+
215+
assertThatThrownBy(() -> delegatingErrorHandler.setErrorHandlers(delegates))
216+
.isInstanceOf(IllegalArgumentException.class)
217+
.hasMessage("All delegates must return the same value when calling 'seeksAfterHandling()'");
218+
}
219+
176220
private Exception wrap(Exception ex) {
177221
return new ListenerExecutionFailedException("test", ex);
178222
}

0 commit comments

Comments
 (0)