Skip to content

Commit c24575c

Browse files
committed
GH-3151: Include error handler into a listener observation
Fixes: #3151 After fixing #3049 we are missing an `ErrorHandler` part within an observation. This even cause a retryable topic logic ot be out of an observation scope. * Restore the previous behavior and add `observation.error(e)` when it is not re-thrown in case of `this.commonErrorHandler` presence **Auto-cherry-pick to `3.1.x` & `3.0.x`**
1 parent 8dae233 commit c24575c

File tree

1 file changed

+27
-26
lines changed

1 file changed

+27
-26
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2691,37 +2691,38 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
26912691
DefaultKafkaListenerObservationConvention.INSTANCE,
26922692
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), this::clusterId),
26932693
this.observationRegistry);
2694-
try {
2695-
observation.observe(() -> {
2694+
return observation.observe(() -> {
2695+
try {
26962696
invokeOnMessage(cRecord);
26972697
successTimer(sample, cRecord);
26982698
recordInterceptAfter(cRecord, null);
2699-
});
2700-
}
2701-
catch (RuntimeException e) {
2702-
failureTimer(sample, cRecord);
2703-
recordInterceptAfter(cRecord, e);
2704-
if (this.commonErrorHandler == null) {
2705-
throw e;
2706-
}
2707-
try {
2708-
invokeErrorHandler(cRecord, iterator, e);
2709-
commitOffsetsIfNeededAfterHandlingError(cRecord);
2710-
}
2711-
catch (KafkaException ke) {
2712-
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
2713-
return ke;
2714-
}
2715-
catch (RuntimeException ee) {
2716-
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
2717-
return ee;
27182699
}
2719-
catch (Error er) { // NOSONAR
2720-
this.logger.error(er, "Error handler threw an error");
2721-
throw er;
2700+
catch (RuntimeException e) {
2701+
failureTimer(sample, cRecord);
2702+
recordInterceptAfter(cRecord, e);
2703+
if (this.commonErrorHandler == null) {
2704+
throw e;
2705+
}
2706+
observation.error(e);
2707+
try {
2708+
invokeErrorHandler(cRecord, iterator, e);
2709+
commitOffsetsIfNeededAfterHandlingError(cRecord);
2710+
}
2711+
catch (KafkaException ke) {
2712+
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
2713+
return ke;
2714+
}
2715+
catch (RuntimeException ee) {
2716+
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
2717+
return ee;
2718+
}
2719+
catch (Error er) { // NOSONAR
2720+
this.logger.error(er, "Error handler threw an error");
2721+
throw er;
2722+
}
27222723
}
2723-
}
2724-
return null;
2724+
return null;
2725+
});
27252726
}
27262727

27272728
private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord<K, V> cRecord) {

0 commit comments

Comments
 (0)