48
48
49
49
import org .springframework .beans .factory .BeanNameAware ;
50
50
import org .springframework .beans .factory .DisposableBean ;
51
+ import org .springframework .beans .factory .ObjectProvider ;
52
+ import org .springframework .beans .factory .SmartInitializingSingleton ;
51
53
import org .springframework .context .ApplicationContext ;
52
54
import org .springframework .context .ApplicationContextAware ;
53
55
import org .springframework .context .ApplicationListener ;
62
64
import org .springframework .kafka .support .TopicPartitionOffset ;
63
65
import org .springframework .kafka .support .converter .MessagingMessageConverter ;
64
66
import org .springframework .kafka .support .converter .RecordMessageConverter ;
67
+ import org .springframework .kafka .support .micrometer .DefaultKafkaTemplateObservationConvention ;
68
+ import org .springframework .kafka .support .micrometer .KafkaRecordSenderContext ;
69
+ import org .springframework .kafka .support .micrometer .KafkaTemplateObservation ;
70
+ import org .springframework .kafka .support .micrometer .KafkaTemplateObservationConvention ;
65
71
import org .springframework .kafka .support .micrometer .MicrometerHolder ;
66
72
import org .springframework .lang .Nullable ;
67
73
import org .springframework .messaging .Message ;
68
74
import org .springframework .messaging .converter .SmartMessageConverter ;
69
75
import org .springframework .transaction .support .TransactionSynchronizationManager ;
70
76
import org .springframework .util .Assert ;
71
77
78
+ import io .micrometer .observation .Observation ;
79
+ import io .micrometer .observation .ObservationRegistry ;
80
+
72
81
/**
73
82
* A template for executing high-level operations. When used with a
74
83
* {@link DefaultKafkaProducerFactory}, the template is thread-safe. The producer factory
90
99
*/
91
100
@ SuppressWarnings ("deprecation" )
92
101
public class KafkaTemplate <K , V > implements KafkaOperations <K , V >, ApplicationContextAware , BeanNameAware ,
93
- ApplicationListener <ContextStoppedEvent >, DisposableBean {
102
+ ApplicationListener <ContextStoppedEvent >, DisposableBean , SmartInitializingSingleton {
94
103
95
104
protected final LogAccessor logger = new LogAccessor (LogFactory .getLog (this .getClass ())); //NOSONAR
96
105
@@ -126,11 +135,17 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationCo
126
135
127
136
private ConsumerFactory <K , V > consumerFactory ;
128
137
129
- private volatile boolean micrometerEnabled = true ;
138
+ private ProducerInterceptor <K , V > producerInterceptor ;
139
+
140
+ private boolean micrometerEnabled = true ;
130
141
131
- private volatile MicrometerHolder micrometerHolder ;
142
+ private MicrometerHolder micrometerHolder ;
132
143
133
- private ProducerInterceptor <K , V > producerInterceptor ;
144
+ private boolean observationEnabled ;
145
+
146
+ private KafkaTemplateObservationConvention observationConvention ;
147
+
148
+ private ObservationRegistry observationRegistry ;
134
149
135
150
/**
136
151
* Create an instance using the supplied producer factory and autoFlush false.
@@ -382,6 +397,37 @@ public void setProducerInterceptor(ProducerInterceptor<K, V> producerInterceptor
382
397
this .producerInterceptor = producerInterceptor ;
383
398
}
384
399
400
+ /**
401
+ * Set to true to enable observation via Micrometer.
402
+ * @param observationEnabled true to enable.
403
+ * @since 3.0
404
+ * @see #setMicrometerEnabled(boolean)
405
+ */
406
+ public void setObservationEnabled (boolean observationEnabled ) {
407
+ this .observationEnabled = observationEnabled ;
408
+ }
409
+
410
+ /**
411
+ * Set a custom {@link KafkaTemplateObservationConvention}.
412
+ * @param observationConvention the convention.
413
+ * @since 3.0
414
+ */
415
+ public void setObservationConvention (KafkaTemplateObservationConvention observationConvention ) {
416
+ this .observationConvention = observationConvention ;
417
+ }
418
+
419
+ @ Override
420
+ public void afterSingletonsInstantiated () {
421
+ if (this .observationEnabled && this .observationRegistry == null && this .applicationContext != null ) {
422
+ ObjectProvider <ObservationRegistry > registry =
423
+ this .applicationContext .getBeanProvider (ObservationRegistry .class );
424
+ this .observationRegistry = registry .getIfUnique ();
425
+ }
426
+ else if (this .micrometerEnabled ) {
427
+ this .micrometerHolder = obtainMicrometerHolder ();
428
+ }
429
+ }
430
+
385
431
@ Override
386
432
public void onApplicationEvent (ContextStoppedEvent event ) {
387
433
if (this .customProducerFactory ) {
@@ -412,33 +458,33 @@ public CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long t
412
458
@ Override
413
459
public CompletableFuture <SendResult <K , V >> send (String topic , @ Nullable V data ) {
414
460
ProducerRecord <K , V > producerRecord = new ProducerRecord <>(topic , data );
415
- return doSend (producerRecord );
461
+ return observeSend (producerRecord );
416
462
}
417
463
418
464
@ Override
419
465
public CompletableFuture <SendResult <K , V >> send (String topic , K key , @ Nullable V data ) {
420
466
ProducerRecord <K , V > producerRecord = new ProducerRecord <>(topic , key , data );
421
- return doSend (producerRecord );
467
+ return observeSend (producerRecord );
422
468
}
423
469
424
470
@ Override
425
471
public CompletableFuture <SendResult <K , V >> send (String topic , Integer partition , K key , @ Nullable V data ) {
426
472
ProducerRecord <K , V > producerRecord = new ProducerRecord <>(topic , partition , key , data );
427
- return doSend (producerRecord );
473
+ return observeSend (producerRecord );
428
474
}
429
475
430
476
@ Override
431
477
public CompletableFuture <SendResult <K , V >> send (String topic , Integer partition , Long timestamp , K key ,
432
478
@ Nullable V data ) {
433
479
434
480
ProducerRecord <K , V > producerRecord = new ProducerRecord <>(topic , partition , timestamp , key , data );
435
- return doSend (producerRecord );
481
+ return observeSend (producerRecord );
436
482
}
437
483
438
484
@ Override
439
485
public CompletableFuture <SendResult <K , V >> send (ProducerRecord <K , V > record ) {
440
486
Assert .notNull (record , "'record' cannot be null" );
441
- return doSend (record );
487
+ return observeSend (record );
442
488
}
443
489
444
490
@ SuppressWarnings ("unchecked" )
@@ -451,7 +497,7 @@ public CompletableFuture<SendResult<K, V>> send(Message<?> message) {
451
497
producerRecord .headers ().add (KafkaHeaders .CORRELATION_ID , correlationId );
452
498
}
453
499
}
454
- return doSend ((ProducerRecord <K , V >) producerRecord );
500
+ return observeSend ((ProducerRecord <K , V >) producerRecord );
455
501
}
456
502
457
503
@@ -621,6 +667,18 @@ protected void closeProducer(Producer<K, V> producer, boolean inTx) {
621
667
}
622
668
}
623
669
670
+ private CompletableFuture <SendResult <K , V >> observeSend (final ProducerRecord <K , V > producerRecord ) {
671
+ Observation observation ;
672
+ if (!this .observationEnabled || this .observationRegistry == null ) {
673
+ observation = Observation .NOOP ;
674
+ }
675
+ else {
676
+ observation = KafkaTemplateObservation .TEMPLATE_OBSERVATION .observation (
677
+ this .observationConvention , DefaultKafkaTemplateObservationConvention .INSTANCE ,
678
+ new KafkaRecordSenderContext (producerRecord , this .beanName ), this .observationRegistry );
679
+ }
680
+ return observation .observe (() -> doSend (producerRecord ));
681
+ }
624
682
/**
625
683
* Send the producer record.
626
684
* @param producerRecord the producer record.
@@ -632,9 +690,6 @@ protected CompletableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V>
632
690
this .logger .trace (() -> "Sending: " + KafkaUtils .format (producerRecord ));
633
691
final CompletableFuture <SendResult <K , V >> future = new CompletableFuture <>();
634
692
Object sample = null ;
635
- if (this .micrometerEnabled && this .micrometerHolder == null ) {
636
- this .micrometerHolder = obtainMicrometerHolder ();
637
- }
638
693
if (this .micrometerHolder != null ) {
639
694
sample = this .micrometerHolder .start ();
640
695
}
0 commit comments