Skip to content

Commit cace1d6

Browse files
artembilanspring-builds
authored andcommitted
GH-3151: Include error handler into a listener observation
Fixes: #3151 After fixing #3049 we are missing an `ErrorHandler` part within an observation. This even cause a retryable topic logic ot be out of an observation scope. * Restore the previous behavior and add `observation.error(e)` when it is not re-thrown in case of `this.commonErrorHandler` presence (cherry picked from commit c24575c)
1 parent daf11c4 commit cace1d6

File tree

1 file changed

+27
-26
lines changed

1 file changed

+27
-26
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2794,37 +2794,38 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
27942794
DefaultKafkaListenerObservationConvention.INSTANCE,
27952795
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), this::clusterId),
27962796
this.observationRegistry);
2797-
try {
2798-
observation.observe(() -> {
2797+
return observation.observe(() -> {
2798+
try {
27992799
invokeOnMessage(cRecord);
28002800
successTimer(sample, cRecord);
28012801
recordInterceptAfter(cRecord, null);
2802-
});
2803-
}
2804-
catch (RuntimeException e) {
2805-
failureTimer(sample, cRecord);
2806-
recordInterceptAfter(cRecord, e);
2807-
if (this.commonErrorHandler == null) {
2808-
throw e;
2809-
}
2810-
try {
2811-
invokeErrorHandler(cRecord, iterator, e);
2812-
commitOffsetsIfNeededAfterHandlingError(cRecord);
2813-
}
2814-
catch (KafkaException ke) {
2815-
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
2816-
return ke;
2817-
}
2818-
catch (RuntimeException ee) {
2819-
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
2820-
return ee;
28212802
}
2822-
catch (Error er) { // NOSONAR
2823-
this.logger.error(er, "Error handler threw an error");
2824-
throw er;
2803+
catch (RuntimeException e) {
2804+
failureTimer(sample, cRecord);
2805+
recordInterceptAfter(cRecord, e);
2806+
if (this.commonErrorHandler == null) {
2807+
throw e;
2808+
}
2809+
observation.error(e);
2810+
try {
2811+
invokeErrorHandler(cRecord, iterator, e);
2812+
commitOffsetsIfNeededAfterHandlingError(cRecord);
2813+
}
2814+
catch (KafkaException ke) {
2815+
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
2816+
return ke;
2817+
}
2818+
catch (RuntimeException ee) {
2819+
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
2820+
return ee;
2821+
}
2822+
catch (Error er) { // NOSONAR
2823+
this.logger.error(er, "Error handler threw an error");
2824+
throw er;
2825+
}
28252826
}
2826-
}
2827-
return null;
2827+
return null;
2828+
});
28282829
}
28292830

28302831
private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord<K, V> cRecord) {

0 commit comments

Comments
 (0)