Skip to content

Commit bc27ffa

Browse files
artembilanspring-builds
authored andcommitted
GH-3151: Include error handler into a listener observation
Fixes: #3151 After fixing #3049 we are missing an `ErrorHandler` part within an observation. This even cause a retryable topic logic ot be out of an observation scope. * Restore the previous behavior and add `observation.error(e)` when it is not re-thrown in case of `this.commonErrorHandler` presence (cherry picked from commit c24575c)
1 parent fffd5ef commit bc27ffa

File tree

1 file changed

+27
-26
lines changed

1 file changed

+27
-26
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2772,37 +2772,38 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
27722772
DefaultKafkaListenerObservationConvention.INSTANCE,
27732773
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), this::clusterId),
27742774
this.observationRegistry);
2775-
try {
2776-
observation.observe(() -> {
2775+
return observation.observe(() -> {
2776+
try {
27772777
invokeOnMessage(cRecord);
27782778
successTimer(sample, cRecord);
27792779
recordInterceptAfter(cRecord, null);
2780-
});
2781-
}
2782-
catch (RuntimeException e) {
2783-
failureTimer(sample, cRecord);
2784-
recordInterceptAfter(cRecord, e);
2785-
if (this.commonErrorHandler == null) {
2786-
throw e;
2787-
}
2788-
try {
2789-
invokeErrorHandler(cRecord, iterator, e);
2790-
commitOffsetsIfNeededAfterHandlingError(cRecord);
2791-
}
2792-
catch (KafkaException ke) {
2793-
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
2794-
return ke;
2795-
}
2796-
catch (RuntimeException ee) {
2797-
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
2798-
return ee;
27992780
}
2800-
catch (Error er) { // NOSONAR
2801-
this.logger.error(er, "Error handler threw an error");
2802-
throw er;
2781+
catch (RuntimeException e) {
2782+
failureTimer(sample, cRecord);
2783+
recordInterceptAfter(cRecord, e);
2784+
if (this.commonErrorHandler == null) {
2785+
throw e;
2786+
}
2787+
observation.error(e);
2788+
try {
2789+
invokeErrorHandler(cRecord, iterator, e);
2790+
commitOffsetsIfNeededAfterHandlingError(cRecord);
2791+
}
2792+
catch (KafkaException ke) {
2793+
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
2794+
return ke;
2795+
}
2796+
catch (RuntimeException ee) {
2797+
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
2798+
return ee;
2799+
}
2800+
catch (Error er) { // NOSONAR
2801+
this.logger.error(er, "Error handler threw an error");
2802+
throw er;
2803+
}
28032804
}
2804-
}
2805-
return null;
2805+
return null;
2806+
});
28062807
}
28072808

28082809
private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord<K, V> cRecord) {

0 commit comments

Comments
 (0)