17
17
package org .springframework .kafka .retrytopic ;
18
18
19
19
import java .time .Clock ;
20
- import java .util .Collections ;
21
20
import java .util .Comparator ;
22
21
import java .util .HashSet ;
23
22
import java .util .List ;
@@ -94,7 +93,7 @@ public class ListenerContainerFactoryConfigurer {
94
93
95
94
private final Clock clock ;
96
95
97
- ListenerContainerFactoryConfigurer (KafkaConsumerBackoffManager kafkaConsumerBackoffManager ,
96
+ public ListenerContainerFactoryConfigurer (KafkaConsumerBackoffManager kafkaConsumerBackoffManager ,
98
97
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory ,
99
98
@ Qualifier (RetryTopicInternalBeanNames
100
99
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME ) Clock clock ) {
@@ -116,7 +115,7 @@ public class ListenerContainerFactoryConfigurer {
116
115
ConcurrentKafkaListenerContainerFactory <?, ?> containerFactory , Configuration configuration ) {
117
116
return isCached (containerFactory )
118
117
? containerFactory
119
- : addToCache (doConfigure (containerFactory , configuration . backOffValues ));
118
+ : addToCache (doConfigure (containerFactory , configuration , true ));
120
119
}
121
120
122
121
/**
@@ -126,14 +125,14 @@ public class ListenerContainerFactoryConfigurer {
126
125
* @param configuration the configuration provided by the {@link RetryTopicConfiguration}.
127
126
* @return the configured factory instance.
128
127
* @deprecated in favor of
129
- * {@link #decorateFactoryWithoutBackOffValues (ConcurrentKafkaListenerContainerFactory, Configuration)}.
128
+ * {@link #decorateFactoryWithoutSettingContainerProperties (ConcurrentKafkaListenerContainerFactory, Configuration)}.
130
129
*/
131
130
@ Deprecated
132
131
public ConcurrentKafkaListenerContainerFactory <?, ?> configureWithoutBackOffValues (
133
132
ConcurrentKafkaListenerContainerFactory <?, ?> containerFactory , Configuration configuration ) {
134
133
return isCached (containerFactory )
135
134
? containerFactory
136
- : doConfigure (containerFactory , Collections . emptyList () );
135
+ : doConfigure (containerFactory , configuration , false );
137
136
}
138
137
139
138
/**
@@ -144,7 +143,7 @@ public class ListenerContainerFactoryConfigurer {
144
143
*/
145
144
public KafkaListenerContainerFactory <?> decorateFactory (ConcurrentKafkaListenerContainerFactory <?, ?> factory ,
146
145
Configuration configuration ) {
147
- return new RetryTopicListenerContainerFactoryDecorator (factory , configuration . backOffValues );
146
+ return new RetryTopicListenerContainerFactoryDecorator (factory , configuration , true );
148
147
}
149
148
150
149
/**
@@ -154,18 +153,19 @@ public KafkaListenerContainerFactory<?> decorateFactory(ConcurrentKafkaListenerC
154
153
* @param configuration the configuration provided by the {@link RetryTopicConfiguration}.
155
154
* @return the decorated factory instance.
156
155
*/
157
- public KafkaListenerContainerFactory <?> decorateFactoryWithoutBackOffValues (
156
+ public KafkaListenerContainerFactory <?> decorateFactoryWithoutSettingContainerProperties (
158
157
ConcurrentKafkaListenerContainerFactory <?, ?> factory , Configuration configuration ) {
159
- return new RetryTopicListenerContainerFactoryDecorator (factory , Collections . emptyList () );
158
+ return new RetryTopicListenerContainerFactoryDecorator (factory , configuration , false );
160
159
}
161
160
162
161
private ConcurrentKafkaListenerContainerFactory <?, ?> doConfigure (
163
- ConcurrentKafkaListenerContainerFactory <?, ?> containerFactory , List <Long > backOffValues ) {
162
+ ConcurrentKafkaListenerContainerFactory <?, ?> containerFactory , Configuration configuration ,
163
+ boolean isSetContainerProperties ) {
164
164
165
165
containerFactory
166
- .setContainerCustomizer (container -> setupBackoffAwareMessageListenerAdapter (container , backOffValues ));
166
+ .setContainerCustomizer (container -> setupBackoffAwareMessageListenerAdapter (container , configuration , isSetContainerProperties ));
167
167
containerFactory
168
- .setCommonErrorHandler (createErrorHandler (this .deadLetterPublishingRecovererFactory .create ()));
168
+ .setCommonErrorHandler (createErrorHandler (this .deadLetterPublishingRecovererFactory .create (), configuration ));
169
169
return containerFactory ;
170
170
}
171
171
@@ -191,7 +191,8 @@ public void setErrorHandlerCustomizer(Consumer<CommonErrorHandler> errorHandlerC
191
191
this .errorHandlerCustomizer = errorHandlerCustomizer ;
192
192
}
193
193
194
- private CommonErrorHandler createErrorHandler (DeadLetterPublishingRecoverer deadLetterPublishingRecoverer ) {
194
+ protected CommonErrorHandler createErrorHandler (DeadLetterPublishingRecoverer deadLetterPublishingRecoverer ,
195
+ Configuration configuration ) {
195
196
DefaultErrorHandler errorHandler = new DefaultErrorHandler (deadLetterPublishingRecoverer ,
196
197
new FixedBackOff (0 , 0 ));
197
198
errorHandler .setCommitRecovered (true );
@@ -200,52 +201,52 @@ private CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer dead
200
201
return errorHandler ;
201
202
}
202
203
203
- private void setupBackoffAwareMessageListenerAdapter (ConcurrentMessageListenerContainer <?, ?> container ,
204
- List < Long > backOffValues ) {
204
+ protected void setupBackoffAwareMessageListenerAdapter (ConcurrentMessageListenerContainer <?, ?> container ,
205
+ Configuration configuration , boolean isSetContainerProperties ) {
205
206
AcknowledgingConsumerAwareMessageListener <?, ?> listener = checkAndCast (container .getContainerProperties ()
206
207
.getMessageListener (), AcknowledgingConsumerAwareMessageListener .class );
207
208
208
- configurePollTimeoutAndIdlePartitionInterval (container , backOffValues );
209
+ if (isSetContainerProperties && !configuration .backOffValues .isEmpty ()) {
210
+ configurePollTimeoutAndIdlePartitionInterval (container , configuration );
211
+ }
209
212
210
213
container .setupMessageListener (new KafkaBackoffAwareMessageListenerAdapter <>(listener ,
211
214
this .kafkaConsumerBackoffManager , container .getListenerId (), this .clock )); // NOSONAR
212
215
213
216
this .containerCustomizer .accept (container );
214
217
}
215
218
216
- private void configurePollTimeoutAndIdlePartitionInterval (ConcurrentMessageListenerContainer <?, ?> container ,
217
- List <Long > backOffValues ) {
218
- if (backOffValues .isEmpty ()) {
219
- return ;
220
- }
219
+ protected void configurePollTimeoutAndIdlePartitionInterval (ConcurrentMessageListenerContainer <?, ?> container ,
220
+ Configuration configuration ) {
221
221
222
222
ContainerProperties containerProperties = container .getContainerProperties ();
223
223
224
- long pollTimeoutValue = getPollTimeoutValue (containerProperties , backOffValues );
224
+ long pollTimeoutValue = getPollTimeoutValue (containerProperties , configuration );
225
225
long idlePartitionEventInterval = getIdlePartitionInterval (containerProperties , pollTimeoutValue );
226
226
227
227
LOGGER .debug (() -> "pollTimeout and idlePartitionEventInterval for back off values "
228
- + backOffValues + " will be set to " + pollTimeoutValue
228
+ + configuration . backOffValues + " will be set to " + pollTimeoutValue
229
229
+ " and " + idlePartitionEventInterval );
230
230
231
231
containerProperties
232
232
.setIdlePartitionEventInterval (idlePartitionEventInterval );
233
233
containerProperties .setPollTimeout (pollTimeoutValue );
234
234
}
235
235
236
- private long getIdlePartitionInterval (ContainerProperties containerProperties , long pollTimeoutValue ) {
236
+ protected long getIdlePartitionInterval (ContainerProperties containerProperties , long pollTimeoutValue ) {
237
237
Long idlePartitionEventInterval = containerProperties .getIdlePartitionEventInterval ();
238
238
return idlePartitionEventInterval != null && idlePartitionEventInterval > 0
239
239
? idlePartitionEventInterval
240
240
: pollTimeoutValue ;
241
241
}
242
242
243
- private long getPollTimeoutValue (ContainerProperties containerProperties , List <Long > backOffValues ) {
244
- if (containerProperties .getPollTimeout () != ContainerProperties .DEFAULT_POLL_TIMEOUT ) {
243
+ protected long getPollTimeoutValue (ContainerProperties containerProperties , Configuration configuration ) {
244
+ if (containerProperties .getPollTimeout () != ContainerProperties .DEFAULT_POLL_TIMEOUT
245
+ || configuration .backOffValues .isEmpty ()) {
245
246
return containerProperties .getPollTimeout ();
246
247
}
247
248
248
- Long lowestBackOff = backOffValues
249
+ Long lowestBackOff = configuration . backOffValues
249
250
.stream ()
250
251
.min (Comparator .naturalOrder ())
251
252
.orElseThrow (() -> new IllegalArgumentException ("No back off values found!" ));
@@ -267,14 +268,19 @@ private <T> T checkAndCast(Object obj, Class<T> clazz) {
267
268
return (T ) obj ;
268
269
}
269
270
270
- private class RetryTopicListenerContainerFactoryDecorator implements KafkaListenerContainerFactory <ConcurrentMessageListenerContainer <?, ?>> {
271
+ private class RetryTopicListenerContainerFactoryDecorator
272
+ implements KafkaListenerContainerFactory <ConcurrentMessageListenerContainer <?, ?>> {
271
273
272
274
private final ConcurrentKafkaListenerContainerFactory <?, ?> delegate ;
273
- private final List <Long > backOffValues ;
275
+ private final Configuration configuration ;
276
+ private final boolean isSetContainerProperties ;
274
277
275
- RetryTopicListenerContainerFactoryDecorator (ConcurrentKafkaListenerContainerFactory <?, ?> delegate , List <Long > backOffValues ) {
278
+ RetryTopicListenerContainerFactoryDecorator (ConcurrentKafkaListenerContainerFactory <?, ?> delegate ,
279
+ Configuration configuration ,
280
+ boolean isSetContainerProperties ) {
276
281
this .delegate = delegate ;
277
- this .backOffValues = backOffValues ;
282
+ this .configuration = configuration ;
283
+ this .isSetContainerProperties = isSetContainerProperties ;
278
284
}
279
285
280
286
@ Override
@@ -283,10 +289,11 @@ private class RetryTopicListenerContainerFactoryDecorator implements KafkaListen
283
289
}
284
290
285
291
private ConcurrentMessageListenerContainer <?, ?> decorate (ConcurrentMessageListenerContainer <?, ?> listenerContainer ) {
286
- setupBackoffAwareMessageListenerAdapter (listenerContainer , this .backOffValues );
287
292
listenerContainer
288
293
.setCommonErrorHandler (createErrorHandler (
289
- ListenerContainerFactoryConfigurer .this .deadLetterPublishingRecovererFactory .create ()));
294
+ ListenerContainerFactoryConfigurer .this .deadLetterPublishingRecovererFactory .create (),
295
+ this .configuration ));
296
+ setupBackoffAwareMessageListenerAdapter (listenerContainer , this .configuration , this .isSetContainerProperties );
290
297
return listenerContainer ;
291
298
}
292
299
0 commit comments