61
61
import org .springframework .kafka .support .TopicPartitionOffset ;
62
62
import org .springframework .kafka .support .converter .MessagingMessageConverter ;
63
63
import org .springframework .kafka .support .converter .RecordMessageConverter ;
64
- import org .springframework .kafka .support .micrometer .MicrometerHolder ;
64
+ import org .springframework .kafka .support .micrometer .KafkaMetrics ;
65
+ import org .springframework .kafka .support .micrometer .KafkaTemplateObservation ;
65
66
import org .springframework .lang .Nullable ;
66
67
import org .springframework .messaging .Message ;
67
68
import org .springframework .messaging .converter .SmartMessageConverter ;
68
69
import org .springframework .transaction .support .TransactionSynchronizationManager ;
69
70
import org .springframework .util .Assert ;
71
+ import org .springframework .util .ObjectUtils ;
70
72
import org .springframework .util .concurrent .ListenableFuture ;
71
73
import org .springframework .util .concurrent .SettableListenableFuture ;
72
74
75
+ import io .micrometer .observation .Observation ;
76
+ import io .micrometer .observation .Observation .TagsProvider ;
77
+
73
78
74
79
/**
75
80
* A template for executing high-level operations. When used with a
@@ -106,9 +111,9 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationCo
106
111
107
112
private final Map <String , String > micrometerTags = new HashMap <>();
108
113
109
- private String beanName = "kafkaTemplate" ;
114
+ private final Observation . Context observationContext = new Observation . Context () ;
110
115
111
- private ApplicationContext applicationContext ;
116
+ private String beanName = "kafkaTemplate" ;
112
117
113
118
private RecordMessageConverter messageConverter = new MessagingMessageConverter ();
114
119
@@ -126,12 +131,10 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationCo
126
131
127
132
private ConsumerFactory <K , V > consumerFactory ;
128
133
129
- private volatile boolean micrometerEnabled = true ;
130
-
131
- private volatile MicrometerHolder micrometerHolder ;
132
-
133
134
private ProducerInterceptor <K , V > producerInterceptor ;
134
135
136
+ private TagsProvider <?> tagsProvider = KafkaMetrics .templateTagsProvider (this .micrometerTags );
137
+
135
138
/**
136
139
* Create an instance using the supplied producer factory and autoFlush false.
137
140
* @param producerFactory the producer factory.
@@ -197,7 +200,6 @@ public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush,
197
200
198
201
Assert .notNull (producerFactory , "'producerFactory' cannot be null" );
199
202
this .autoFlush = autoFlush ;
200
- this .micrometerEnabled = KafkaUtils .MICROMETER_PRESENT ;
201
203
this .customProducerFactory = configOverrides != null && configOverrides .size () > 0 ;
202
204
if (this .customProducerFactory ) {
203
205
this .producerFactory = producerFactory .copyWithConfigurationOverride (configOverrides );
@@ -211,11 +213,11 @@ public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush,
211
213
@ Override
212
214
public void setBeanName (String name ) {
213
215
this .beanName = name ;
216
+ this .observationContext .put (KafkaTemplateObservation .TemplateLowCardinalityTags .BEAN_NAME .getKey (), name );
214
217
}
215
218
216
219
@ Override
217
220
public void setApplicationContext (ApplicationContext applicationContext ) {
218
- this .applicationContext = applicationContext ;
219
221
if (this .customProducerFactory ) {
220
222
((DefaultKafkaProducerFactory <K , V >) this .producerFactory ).setApplicationContext (applicationContext );
221
223
}
@@ -327,9 +329,10 @@ public boolean isAllowNonTransactional() {
327
329
* Set to false to disable micrometer timers, if micrometer is on the class path.
328
330
* @param micrometerEnabled false to disable.
329
331
* @since 2.5
332
+ * @deprecated this is no longer used.
330
333
*/
334
+ @ Deprecated
331
335
public void setMicrometerEnabled (boolean micrometerEnabled ) {
332
- this .micrometerEnabled = micrometerEnabled ;
333
336
}
334
337
335
338
/**
@@ -338,8 +341,9 @@ public void setMicrometerEnabled(boolean micrometerEnabled) {
338
341
* @since 2.5
339
342
*/
340
343
public void setMicrometerTags (Map <String , String > tags ) {
341
- if (tags != null ) {
344
+ if (! ObjectUtils . isEmpty ( tags ) ) {
342
345
this .micrometerTags .putAll (tags );
346
+ this .tagsProvider = KafkaMetrics .templateTagsProvider (this .micrometerTags );
343
347
}
344
348
}
345
349
@@ -625,18 +629,16 @@ protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> p
625
629
final Producer <K , V > producer = getTheProducer (producerRecord .topic ());
626
630
this .logger .trace (() -> "Sending: " + KafkaUtils .format (producerRecord ));
627
631
final SettableListenableFuture <SendResult <K , V >> future = new SettableListenableFuture <>();
628
- Object sample = null ;
629
- if (this .micrometerEnabled && this .micrometerHolder == null ) {
630
- this .micrometerHolder = obtainMicrometerHolder ();
631
- }
632
- if (this .micrometerHolder != null ) {
633
- sample = this .micrometerHolder .start ();
634
- }
632
+ Observation observation = Observation
633
+ .createNotStarted (KafkaTemplateObservation .TEMPLATE_OBSERVATION .getName (),
634
+ this .observationContext , KafkaMetrics .OBSERVATION_REGISTRY )
635
+ .tagsProvider (this .tagsProvider )
636
+ .start ();
635
637
if (this .producerInterceptor != null ) {
636
638
this .producerInterceptor .onSend (producerRecord );
637
639
}
638
640
Future <RecordMetadata > sendFuture =
639
- producer .send (producerRecord , buildCallback (producerRecord , producer , future , sample ));
641
+ producer .send (producerRecord , buildCallback (producerRecord , producer , future , observation ));
640
642
// May be an immediate failure
641
643
if (sendFuture .isDone ()) {
642
644
try {
@@ -658,7 +660,7 @@ protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> p
658
660
}
659
661
660
662
private Callback buildCallback (final ProducerRecord <K , V > producerRecord , final Producer <K , V > producer ,
661
- final SettableListenableFuture <SendResult <K , V >> future , @ Nullable Object sample ) {
663
+ final SettableListenableFuture <SendResult <K , V >> future , Observation observation ) {
662
664
663
665
return (metadata , exception ) -> {
664
666
try {
@@ -671,9 +673,6 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
671
673
}
672
674
try {
673
675
if (exception == null ) {
674
- if (sample != null ) {
675
- this .micrometerHolder .success (sample );
676
- }
677
676
future .set (new SendResult <>(producerRecord , metadata ));
678
677
if (KafkaTemplate .this .producerListener != null ) {
679
678
KafkaTemplate .this .producerListener .onSuccess (producerRecord , metadata );
@@ -682,9 +681,7 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
682
681
+ ", metadata: " + metadata );
683
682
}
684
683
else {
685
- if (sample != null ) {
686
- this .micrometerHolder .failure (sample , exception .getClass ().getSimpleName ());
687
- }
684
+ observation .error (exception );
688
685
future .setException (new KafkaProducerException (producerRecord , "Failed to send" , exception ));
689
686
if (KafkaTemplate .this .producerListener != null ) {
690
687
KafkaTemplate .this .producerListener .onError (producerRecord , metadata , exception );
@@ -694,6 +691,7 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
694
691
}
695
692
}
696
693
finally {
694
+ observation .stop ();
697
695
if (!KafkaTemplate .this .transactional ) {
698
696
closeProducer (producer , false );
699
697
}
@@ -753,27 +751,8 @@ else if (topic == null) {
753
751
}
754
752
}
755
753
756
- @ Nullable
757
- private MicrometerHolder obtainMicrometerHolder () {
758
- MicrometerHolder holder = null ;
759
- try {
760
- if (KafkaUtils .MICROMETER_PRESENT ) {
761
- holder = new MicrometerHolder (this .applicationContext , this .beanName ,
762
- "spring.kafka.template" , "KafkaTemplate Timer" ,
763
- this .micrometerTags );
764
- }
765
- }
766
- catch (@ SuppressWarnings ("unused" ) IllegalStateException ex ) {
767
- this .micrometerEnabled = false ;
768
- }
769
- return holder ;
770
- }
771
-
772
754
@ Override
773
755
public void destroy () {
774
- if (this .micrometerHolder != null ) {
775
- this .micrometerHolder .destroy ();
776
- }
777
756
if (this .customProducerFactory ) {
778
757
((DefaultKafkaProducerFactory <K , V >) this .producerFactory ).destroy ();
779
758
}
0 commit comments