61
61
import org .springframework .kafka .support .TopicPartitionOffset ;
62
62
import org .springframework .kafka .support .converter .MessagingMessageConverter ;
63
63
import org .springframework .kafka .support .converter .RecordMessageConverter ;
64
- import org .springframework .kafka .support .micrometer .MicrometerHolder ;
64
+ import org .springframework .kafka .support .micrometer .KafkaMetrics ;
65
+ import org .springframework .kafka .support .micrometer .KafkaTemplateObservation ;
65
66
import org .springframework .lang .Nullable ;
66
67
import org .springframework .messaging .Message ;
67
68
import org .springframework .messaging .converter .SmartMessageConverter ;
68
69
import org .springframework .transaction .support .TransactionSynchronizationManager ;
69
70
import org .springframework .util .Assert ;
71
+ import org .springframework .util .ObjectUtils ;
70
72
import org .springframework .util .concurrent .ListenableFuture ;
71
73
import org .springframework .util .concurrent .SettableListenableFuture ;
72
74
75
+ import io .micrometer .observation .Observation ;
76
+ import io .micrometer .observation .Observation .TagsProvider ;
77
+ import io .micrometer .observation .ObservationRegistry ;
78
+
73
79
74
80
/**
75
81
* A template for executing high-level operations. When used with a
@@ -106,9 +112,9 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationCo
106
112
107
113
private final Map <String , String > micrometerTags = new HashMap <>();
108
114
109
- private String beanName = "kafkaTemplate" ;
115
+ private final Observation . Context observationContext = new Observation . Context () ;
110
116
111
- private ApplicationContext applicationContext ;
117
+ private String beanName = "kafkaTemplate" ;
112
118
113
119
private RecordMessageConverter messageConverter = new MessagingMessageConverter ();
114
120
@@ -126,11 +132,11 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationCo
126
132
127
133
private ConsumerFactory <K , V > consumerFactory ;
128
134
129
- private volatile boolean micrometerEnabled = true ;
135
+ private ProducerInterceptor < K , V > producerInterceptor ;
130
136
131
- private volatile MicrometerHolder micrometerHolder ;
137
+ private TagsProvider <?> tagsProvider = KafkaMetrics . templateTagsProvider ( this . micrometerTags ) ;
132
138
133
- private ProducerInterceptor < K , V > producerInterceptor ;
139
+ private ObservationRegistry observationRegistry ;
134
140
135
141
/**
136
142
* Create an instance using the supplied producer factory and autoFlush false.
@@ -197,7 +203,6 @@ public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush,
197
203
198
204
Assert .notNull (producerFactory , "'producerFactory' cannot be null" );
199
205
this .autoFlush = autoFlush ;
200
- this .micrometerEnabled = KafkaUtils .MICROMETER_PRESENT ;
201
206
this .customProducerFactory = configOverrides != null && configOverrides .size () > 0 ;
202
207
if (this .customProducerFactory ) {
203
208
this .producerFactory = producerFactory .copyWithConfigurationOverride (configOverrides );
@@ -211,11 +216,12 @@ public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush,
211
216
@ Override
212
217
public void setBeanName (String name ) {
213
218
this .beanName = name ;
219
+ this .observationContext .put (KafkaTemplateObservation .TemplateLowCardinalityTags .BEAN_NAME .getKey (), name );
214
220
}
215
221
216
222
@ Override
217
223
public void setApplicationContext (ApplicationContext applicationContext ) {
218
- this .applicationContext = applicationContext ;
224
+ this .observationRegistry = KafkaMetrics . getRegistry ( applicationContext ) ;
219
225
if (this .customProducerFactory ) {
220
226
((DefaultKafkaProducerFactory <K , V >) this .producerFactory ).setApplicationContext (applicationContext );
221
227
}
@@ -327,9 +333,10 @@ public boolean isAllowNonTransactional() {
327
333
* Set to false to disable micrometer timers, if micrometer is on the class path.
328
334
* @param micrometerEnabled false to disable.
329
335
* @since 2.5
336
+ * @deprecated this is no longer used.
330
337
*/
338
+ @ Deprecated
331
339
public void setMicrometerEnabled (boolean micrometerEnabled ) {
332
- this .micrometerEnabled = micrometerEnabled ;
333
340
}
334
341
335
342
/**
@@ -338,8 +345,9 @@ public void setMicrometerEnabled(boolean micrometerEnabled) {
338
345
* @since 2.5
339
346
*/
340
347
public void setMicrometerTags (Map <String , String > tags ) {
341
- if (tags != null ) {
348
+ if (! ObjectUtils . isEmpty ( tags ) ) {
342
349
this .micrometerTags .putAll (tags );
350
+ this .tagsProvider = KafkaMetrics .templateTagsProvider (this .micrometerTags );
343
351
}
344
352
}
345
353
@@ -625,18 +633,19 @@ protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> p
625
633
final Producer <K , V > producer = getTheProducer (producerRecord .topic ());
626
634
this .logger .trace (() -> "Sending: " + KafkaUtils .format (producerRecord ));
627
635
final SettableListenableFuture <SendResult <K , V >> future = new SettableListenableFuture <>();
628
- Object sample = null ;
629
- if (this .micrometerEnabled && this .micrometerHolder == null ) {
630
- this .micrometerHolder = obtainMicrometerHolder ();
631
- }
632
- if (this .micrometerHolder != null ) {
633
- sample = this .micrometerHolder .start ();
636
+ Observation observation = null ;
637
+ if (this .observationRegistry != null ) {
638
+ observation = Observation
639
+ .createNotStarted (KafkaTemplateObservation .TEMPLATE_OBSERVATION .getName (),
640
+ this .observationContext , this .observationRegistry )
641
+ .tagsProvider (this .tagsProvider )
642
+ .start ();
634
643
}
635
644
if (this .producerInterceptor != null ) {
636
645
this .producerInterceptor .onSend (producerRecord );
637
646
}
638
647
Future <RecordMetadata > sendFuture =
639
- producer .send (producerRecord , buildCallback (producerRecord , producer , future , sample ));
648
+ producer .send (producerRecord , buildCallback (producerRecord , producer , future , observation ));
640
649
// May be an immediate failure
641
650
if (sendFuture .isDone ()) {
642
651
try {
@@ -658,7 +667,7 @@ protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> p
658
667
}
659
668
660
669
private Callback buildCallback (final ProducerRecord <K , V > producerRecord , final Producer <K , V > producer ,
661
- final SettableListenableFuture <SendResult <K , V >> future , @ Nullable Object sample ) {
670
+ final SettableListenableFuture <SendResult <K , V >> future , @ Nullable Observation observation ) {
662
671
663
672
return (metadata , exception ) -> {
664
673
try {
@@ -671,9 +680,6 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
671
680
}
672
681
try {
673
682
if (exception == null ) {
674
- if (sample != null ) {
675
- this .micrometerHolder .success (sample );
676
- }
677
683
future .set (new SendResult <>(producerRecord , metadata ));
678
684
if (KafkaTemplate .this .producerListener != null ) {
679
685
KafkaTemplate .this .producerListener .onSuccess (producerRecord , metadata );
@@ -682,8 +688,8 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
682
688
+ ", metadata: " + metadata );
683
689
}
684
690
else {
685
- if (sample != null ) {
686
- this . micrometerHolder . failure ( sample , exception . getClass (). getSimpleName () );
691
+ if (observation != null ) {
692
+ observation . error ( exception );
687
693
}
688
694
future .setException (new KafkaProducerException (producerRecord , "Failed to send" , exception ));
689
695
if (KafkaTemplate .this .producerListener != null ) {
@@ -694,6 +700,9 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
694
700
}
695
701
}
696
702
finally {
703
+ if (observation != null ) {
704
+ observation .stop ();
705
+ }
697
706
if (!KafkaTemplate .this .transactional ) {
698
707
closeProducer (producer , false );
699
708
}
@@ -753,27 +762,8 @@ else if (topic == null) {
753
762
}
754
763
}
755
764
756
- @ Nullable
757
- private MicrometerHolder obtainMicrometerHolder () {
758
- MicrometerHolder holder = null ;
759
- try {
760
- if (KafkaUtils .MICROMETER_PRESENT ) {
761
- holder = new MicrometerHolder (this .applicationContext , this .beanName ,
762
- "spring.kafka.template" , "KafkaTemplate Timer" ,
763
- this .micrometerTags );
764
- }
765
- }
766
- catch (@ SuppressWarnings ("unused" ) IllegalStateException ex ) {
767
- this .micrometerEnabled = false ;
768
- }
769
- return holder ;
770
- }
771
-
772
765
@ Override
773
766
public void destroy () {
774
- if (this .micrometerHolder != null ) {
775
- this .micrometerHolder .destroy ();
776
- }
777
767
if (this .customProducerFactory ) {
778
768
((DefaultKafkaProducerFactory <K , V >) this .producerFactory ).destroy ();
779
769
}
0 commit comments