Skip to content

Commit 6f32959

Browse files
authored
Batch listener does not support observation (#3047)
Fixes: #3047 * because of observation does not support batch listener, add properties `observationEnabled` and replace `this.containerProperties.isObservationEnabled()` for `KafkaMessageListenerContainer.ListenerConsumer`, when batch listener set false to `observationEnabled`. * add notice in micrometer.adoc * add observationEnabled and replace containerProperties.isObservationEnabled() in KafkaMessageListenerContainer.ListenerConsumer **Auto-cherry-pick to `3.1.x`**
1 parent 7a53b01 commit 6f32959

File tree

2 files changed

+9
-3
lines changed

2 files changed

+9
-3
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,9 @@ Using Micrometer for observation is now supported, since version 3.0, for the `K
9595

9696
Set `observationEnabled` to `true` on the `KafkaTemplate` and `ContainerProperties` to enable observation; this will disable xref:kafka/micrometer.adoc[Micrometer Timers] because the timers will now be managed with each observation.
9797

98-
Refer to https://docs.micrometer.io/tracing/reference/index.html[Micrometer Tracing] for more information.
98+
IMPORTANT: Micrometer Observation does not support batch listener; this will enable Micrometer Timers
99+
100+
Refer to https://micrometer.io/docs/tracing[Micrometer Tracing] for more information.
99101

100102
To add tags to timers/traces, configure a custom `KafkaTemplateObservationConvention` or `KafkaListenerObservationConvention` to the template or listener container, respectively.
101103

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
758758

759759
private final MicrometerHolder micrometerHolder;
760760

761+
private final boolean observationEnabled;
762+
761763
private final AtomicBoolean polling = new AtomicBoolean();
762764

763765
private final boolean subBatchPerPartition;
@@ -912,13 +914,15 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
912914
this.isBatchListener = true;
913915
this.wantsFullRecords = this.batchListener.wantsPollResult();
914916
this.pollThreadStateProcessor = setUpPollProcessor(true);
917+
this.observationEnabled = false;
915918
}
916919
else if (listener instanceof MessageListener) {
917920
this.listener = (MessageListener<K, V>) listener;
918921
this.batchListener = null;
919922
this.isBatchListener = false;
920923
this.wantsFullRecords = false;
921924
this.pollThreadStateProcessor = setUpPollProcessor(false);
925+
this.observationEnabled = this.containerProperties.isObservationEnabled();
922926
}
923927
else {
924928
throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
@@ -999,7 +1003,7 @@ private Object determineBootstrapServers(Properties consumerProperties) {
9991003
@Nullable
10001004
private KafkaAdmin obtainAdmin() {
10011005
KafkaAdmin customAdmin = KafkaMessageListenerContainer.this.thisOrParentContainer.getKafkaAdmin();
1002-
if (customAdmin == null && this.containerProperties.isObservationEnabled()) {
1006+
if (customAdmin == null && this.observationEnabled) {
10031007
ApplicationContext applicationContext = getApplicationContext();
10041008
if (applicationContext != null) {
10051009
KafkaAdmin admin = applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
@@ -1279,7 +1283,7 @@ private MicrometerHolder obtainMicrometerHolder() {
12791283
MicrometerHolder holder = null;
12801284
try {
12811285
if (KafkaUtils.MICROMETER_PRESENT && this.containerProperties.isMicrometerEnabled()
1282-
&& !this.containerProperties.isObservationEnabled()) {
1286+
&& !this.observationEnabled) {
12831287

12841288
Function<Object, Map<String, String>> mergedProvider =
12851289
cr -> this.containerProperties.getMicrometerTags();

0 commit comments

Comments
 (0)