Skip to content

Commit 2c9aacc

Browse files
Wzy19930507sobychacko
authored andcommitted
Batch listener does not support observation (#3047)
Fixes: #3047 * because of observation does not support batch listener, add properties `observationEnabled` and replace `this.containerProperties.isObservationEnabled()` for `KafkaMessageListenerContainer.ListenerConsumer`, when batch listener set false to `observationEnabled`. * add notice in micrometer.adoc * add observationEnabled and replace containerProperties.isObservationEnabled() in KafkaMessageListenerContainer.ListenerConsumer **Auto-cherry-pick to `3.1.x`**
1 parent a1f4e26 commit 2c9aacc

File tree

2 files changed

+8
-2
lines changed

2 files changed

+8
-2
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ Using Micrometer for observation is now supported, since version 3.0, for the `K
9595

9696
Set `observationEnabled` to `true` on the `KafkaTemplate` and `ContainerProperties` to enable observation; this will disable xref:kafka/micrometer.adoc[Micrometer Timers] because the timers will now be managed with each observation.
9797

98+
IMPORTANT: Micrometer Observation does not support batch listener; this will enable Micrometer Timers
99+
98100
Refer to https://micrometer.io/docs/tracing[Micrometer Tracing] for more information.
99101

100102
To add tags to timers/traces, configure a custom `KafkaTemplateObservationConvention` or `KafkaListenerObservationConvention` to the template or listener container, respectively.

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -751,6 +751,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
751751

752752
private final MicrometerHolder micrometerHolder;
753753

754+
private final boolean observationEnabled;
755+
754756
private final AtomicBoolean polling = new AtomicBoolean();
755757

756758
private final boolean subBatchPerPartition;
@@ -902,13 +904,15 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
902904
this.isBatchListener = true;
903905
this.wantsFullRecords = this.batchListener.wantsPollResult();
904906
this.pollThreadStateProcessor = setUpPollProcessor(true);
907+
this.observationEnabled = false;
905908
}
906909
else if (listener instanceof MessageListener) {
907910
this.listener = (MessageListener<K, V>) listener;
908911
this.batchListener = null;
909912
this.isBatchListener = false;
910913
this.wantsFullRecords = false;
911914
this.pollThreadStateProcessor = setUpPollProcessor(false);
915+
this.observationEnabled = this.containerProperties.isObservationEnabled();
912916
}
913917
else {
914918
throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
@@ -990,7 +994,7 @@ private Object determineBootstrapServers(Properties consumerProperties) {
990994
@Nullable
991995
private KafkaAdmin obtainAdmin() {
992996
KafkaAdmin customAdmin = KafkaMessageListenerContainer.this.thisOrParentContainer.getKafkaAdmin();
993-
if (customAdmin == null && this.containerProperties.isObservationEnabled()) {
997+
if (customAdmin == null && this.observationEnabled) {
994998
ApplicationContext applicationContext = getApplicationContext();
995999
if (applicationContext != null) {
9961000
KafkaAdmin admin = applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
@@ -1270,7 +1274,7 @@ private MicrometerHolder obtainMicrometerHolder() {
12701274
MicrometerHolder holder = null;
12711275
try {
12721276
if (KafkaUtils.MICROMETER_PRESENT && this.containerProperties.isMicrometerEnabled()
1273-
&& !this.containerProperties.isObservationEnabled()) {
1277+
&& !this.observationEnabled) {
12741278

12751279
Function<Object, Map<String, String>> mergedProvider =
12761280
cr -> this.containerProperties.getMicrometerTags();

0 commit comments

Comments
 (0)