44
44
* the listener throws a {@link BatchListenerFailedException}, the offsets prior to the
45
45
* failed record are committed and the remaining records have seeks performed. When the
46
46
* retries are exhausted, the failed record is sent to the recoverer instead of being
47
- * included in the seeks. If other exceptions are thrown processing is delegated to the
48
- * fallback handler.
47
+ * included in the seeks. If other exceptions are thrown, the fallback handler takes the processing.
49
48
*
50
49
* @author Gary Russell
51
50
* @author Francois Rosiere
52
51
* @author Wang Zhiyang
52
+ * @author Artem Bilan
53
53
* @since 2.8
54
54
*
55
55
*/
@@ -63,10 +63,10 @@ public abstract class FailedBatchProcessor extends FailedRecordProcessor {
63
63
* Construct an instance with the provided properties.
64
64
* @param recoverer the recoverer.
65
65
* @param backOff the back off.
66
- * @param fallbackHandler the fall back handler.
66
+ * @param fallbackHandler the fallback handler.
67
67
*/
68
68
public FailedBatchProcessor (@ Nullable BiConsumer <ConsumerRecord <?, ?>, Exception > recoverer , BackOff backOff ,
69
- CommonErrorHandler fallbackHandler ) {
69
+ CommonErrorHandler fallbackHandler ) {
70
70
71
71
this (recoverer , backOff , null , fallbackHandler );
72
72
}
@@ -76,11 +76,11 @@ public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception
76
76
* @param recoverer the recoverer.
77
77
* @param backOff the back off.
78
78
* @param backOffHandler the {@link BackOffHandler}
79
- * @param fallbackHandler the fall back handler.
79
+ * @param fallbackHandler the fallback handler.
80
80
* @since 2.9
81
81
*/
82
82
public FailedBatchProcessor (@ Nullable BiConsumer <ConsumerRecord <?, ?>, Exception > recoverer , BackOff backOff ,
83
- @ Nullable BackOffHandler backOffHandler , CommonErrorHandler fallbackHandler ) {
83
+ @ Nullable BackOffHandler backOffHandler , CommonErrorHandler fallbackHandler ) {
84
84
85
85
super (recoverer , backOff , backOffHandler );
86
86
this .fallbackBatchHandler = fallbackHandler ;
@@ -103,7 +103,7 @@ public void setLogLevel(Level logLevel) {
103
103
}
104
104
105
105
/**
106
- * Set to false to not reclassify the exception if different from the previous
106
+ * Set to {@code false} to not reclassify the exception if different from the previous
107
107
* failure. If the changed exception is classified as retryable, the existing back off
108
108
* sequence is used; a new sequence is not started. Default true. Only applies when
109
109
* the fallback batch error handler (for exceptions other than
@@ -195,7 +195,7 @@ private void fallback(Exception thrownException, ConsumerRecords<?, ?> data, Con
195
195
this .fallbackBatchHandler .handleBatch (thrownException , data , consumer , container , invokeListener );
196
196
}
197
197
198
- private int findIndex (ConsumerRecords <?, ?> data , ConsumerRecord <?, ?> record ) {
198
+ private int findIndex (ConsumerRecords <?, ?> data , @ Nullable ConsumerRecord <?, ?> record ) {
199
199
if (record == null ) {
200
200
return -1 ;
201
201
}
@@ -229,57 +229,60 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
229
229
remaining .add (datum );
230
230
}
231
231
}
232
+
232
233
try {
233
- if (offsets .size () > 0 ) {
234
+ if (! offsets .isEmpty () ) {
234
235
commit (consumer , container , offsets );
235
236
}
236
237
}
237
- finally {
238
- if (isSeekAfterError ()) {
239
- if (remaining .size () > 0 ) {
240
- SeekUtils .seekOrRecover (thrownException , remaining , consumer , container , false ,
241
- getFailureTracker (), this .logger , getLogLevel ());
242
- ConsumerRecord <?, ?> recovered = remaining .get (0 );
243
- commit (consumer , container ,
244
- Collections .singletonMap (new TopicPartition (recovered .topic (), recovered .partition ()),
245
- ListenerUtils .createOffsetAndMetadata (container , recovered .offset () + 1 )));
246
- if (remaining .size () > 1 ) {
247
- throw new KafkaException ("Seek to current after exception" , getLogLevel (), thrownException );
248
- }
238
+ catch (Exception ex ) {
239
+ // Ignore and follow with seek below
240
+ }
241
+
242
+ if (isSeekAfterError ()) {
243
+ if (!remaining .isEmpty ()) {
244
+ SeekUtils .seekOrRecover (thrownException , remaining , consumer , container , false ,
245
+ getFailureTracker (), this .logger , getLogLevel ());
246
+ ConsumerRecord <?, ?> recovered = remaining .get (0 );
247
+ commit (consumer , container ,
248
+ Collections .singletonMap (new TopicPartition (recovered .topic (), recovered .partition ()),
249
+ ListenerUtils .createOffsetAndMetadata (container , recovered .offset () + 1 )));
250
+ if (remaining .size () > 1 ) {
251
+ throw new KafkaException ("Seek to current after exception" , getLogLevel (), thrownException );
249
252
}
250
- return ConsumerRecords .empty ();
251
253
}
252
- else {
253
- if (remaining .size () > 0 ) {
254
- try {
255
- if (getFailureTracker ().recovered (remaining .get (0 ), thrownException , container ,
256
- consumer )) {
257
- remaining .remove (0 );
258
- }
259
- }
260
- catch (Exception e ) {
261
- if (SeekUtils .isBackoffException (thrownException )) {
262
- this .logger .debug (e , () -> KafkaUtils .format (remaining .get (0 ))
263
- + " included in remaining due to retry back off " + thrownException );
264
- }
265
- else {
266
- this .logger .error (e , KafkaUtils .format (remaining .get (0 ))
267
- + " included in remaining due to " + thrownException );
268
- }
254
+ return ConsumerRecords .empty ();
255
+ }
256
+ else {
257
+ if (!remaining .isEmpty ()) {
258
+ try {
259
+ if (getFailureTracker ().recovered (remaining .get (0 ), thrownException , container ,
260
+ consumer )) {
261
+ remaining .remove (0 );
269
262
}
270
263
}
271
- if (remaining .isEmpty ()) {
272
- return ConsumerRecords .empty ();
264
+ catch (Exception e ) {
265
+ if (SeekUtils .isBackoffException (thrownException )) {
266
+ this .logger .debug (e , () -> KafkaUtils .format (remaining .get (0 ))
267
+ + " included in remaining due to retry back off " + thrownException );
268
+ }
269
+ else {
270
+ this .logger .error (e , KafkaUtils .format (remaining .get (0 ))
271
+ + " included in remaining due to " + thrownException );
272
+ }
273
273
}
274
- Map <TopicPartition , List <ConsumerRecord <K , V >>> remains = new HashMap <>();
275
- remaining .forEach (rec -> remains .computeIfAbsent (new TopicPartition (rec .topic (), rec .partition ()),
276
- tp -> new ArrayList <>()).add ((ConsumerRecord <K , V >) rec ));
277
- return new ConsumerRecords <>(remains );
278
274
}
275
+ if (remaining .isEmpty ()) {
276
+ return ConsumerRecords .empty ();
277
+ }
278
+ Map <TopicPartition , List <ConsumerRecord <K , V >>> remains = new HashMap <>();
279
+ remaining .forEach (rec -> remains .computeIfAbsent (new TopicPartition (rec .topic (), rec .partition ()),
280
+ tp -> new ArrayList <>()).add ((ConsumerRecord <K , V >) rec ));
281
+ return new ConsumerRecords <>(remains );
279
282
}
280
283
}
281
284
282
- private void commit (Consumer <?, ?> consumer , MessageListenerContainer container ,
285
+ private static void commit (Consumer <?, ?> consumer , MessageListenerContainer container ,
283
286
Map <TopicPartition , OffsetAndMetadata > offsets ) {
284
287
285
288
ContainerProperties properties = container .getContainerProperties ();
@@ -296,7 +299,7 @@ private void commit(Consumer<?, ?> consumer, MessageListenerContainer container,
296
299
}
297
300
298
301
@ Nullable
299
- private BatchListenerFailedException getBatchListenerFailedException (Throwable throwableArg ) {
302
+ private static BatchListenerFailedException getBatchListenerFailedException (@ Nullable Throwable throwableArg ) {
300
303
if (throwableArg == null || throwableArg instanceof BatchListenerFailedException ) {
301
304
return (BatchListenerFailedException ) throwableArg ;
302
305
}
0 commit comments