13
13
14
14
package com .rabbitmq .stream .perf ;
15
15
16
- import com .codahale .metrics .*;
16
+ import com .codahale .metrics .ConsoleReporter ;
17
+ import io .micrometer .core .instrument .Counter ;
17
18
import io .micrometer .core .instrument .Timer ;
18
19
import io .micrometer .core .instrument .composite .CompositeMeterRegistry ;
19
- import io .micrometer .core .instrument .dropwizard .DropwizardConfig ;
20
+ import io .micrometer .core .instrument .distribution .HistogramSnapshot ;
21
+ import io .micrometer .core .instrument .distribution .HistogramSupport ;
22
+ import io .micrometer .core .instrument .distribution .ValueAtPercentile ;
20
23
import io .micrometer .core .instrument .dropwizard .DropwizardMeterRegistry ;
21
- import io .micrometer .core .instrument .util .HierarchicalNameMapper ;
22
24
import java .io .*;
23
25
import java .nio .file .Files ;
24
26
import java .nio .file .Path ;
28
30
import java .text .StringCharacterIterator ;
29
31
import java .time .Duration ;
30
32
import java .util .*;
33
+ import java .util .Map .Entry ;
31
34
import java .util .concurrent .ConcurrentHashMap ;
32
35
import java .util .concurrent .Executors ;
33
36
import java .util .concurrent .ScheduledExecutorService ;
@@ -45,7 +48,8 @@ class DefaultPerformanceMetrics implements PerformanceMetrics {
45
48
46
49
private static final Logger LOGGER = LoggerFactory .getLogger (DefaultPerformanceMetrics .class );
47
50
48
- private final MetricRegistry metricRegistry ;
51
+ private final String metricsPrefix ;
52
+ private final CompositeMeterRegistry meterRegistry ;
49
53
private final Timer latency , confirmLatency ;
50
54
private final boolean summaryFile ;
51
55
private final PrintWriter out ;
@@ -55,7 +59,6 @@ class DefaultPerformanceMetrics implements PerformanceMetrics {
55
59
private volatile long lastPublishedCount = 0 ;
56
60
private volatile long lastConsumedCount = 0 ;
57
61
private volatile long offset ;
58
- private final String metricsSuffix ;
59
62
60
63
DefaultPerformanceMetrics (
61
64
CompositeMeterRegistry meterRegistry ,
@@ -69,31 +72,9 @@ class DefaultPerformanceMetrics implements PerformanceMetrics {
69
72
this .includeByteRates = includeByteRates ;
70
73
this .memoryReportSupplier = memoryReportSupplier ;
71
74
this .out = out ;
72
- DropwizardConfig dropwizardConfig =
73
- new DropwizardConfig () {
74
- @ Override
75
- public String prefix () {
76
- return "" ;
77
- }
75
+ this .metricsPrefix = metricsPrefix ;
76
+ this .meterRegistry = meterRegistry ;
78
77
79
- @ Override
80
- public String get (String key ) {
81
- return null ;
82
- }
83
- };
84
- this .metricRegistry = new MetricRegistry ();
85
- DropwizardMeterRegistry dropwizardMeterRegistry =
86
- new DropwizardMeterRegistry (
87
- dropwizardConfig ,
88
- this .metricRegistry ,
89
- HierarchicalNameMapper .DEFAULT ,
90
- io .micrometer .core .instrument .Clock .SYSTEM ) {
91
- @ Override
92
- protected Double nullGaugeValue () {
93
- return null ;
94
- }
95
- };
96
- meterRegistry .add (dropwizardMeterRegistry );
97
78
this .latency =
98
79
Timer .builder (metricsPrefix + ".latency" )
99
80
.description ("message latency" )
@@ -112,36 +93,28 @@ protected Double nullGaugeValue() {
112
93
} else {
113
94
this .confirmLatency = null ;
114
95
}
115
- // the metrics name contains the tags, if any,
116
- // so we extract the suffix to use it later when looking up the metrics
117
- String key = metricRegistry .getMeters ().keySet ().iterator ().next ();
118
- int index = key .indexOf ("." );
119
- this .metricsSuffix = index == -1 ? "" : key .substring (index );
120
96
}
121
97
122
98
private long getPublishedCount () {
123
- return this .metricRegistry
124
- .getMeters ()
125
- .get ("rabbitmqStreamPublished" + metricsSuffix )
126
- .getCount ();
99
+ return (long ) this .meterRegistry .get (metricsName ("published" )).counter ().count ();
127
100
}
128
101
129
102
private long getConsumedCount () {
130
- return this . metricRegistry . getMeters (). get ("rabbitmqStreamConsumed" + metricsSuffix ). getCount ();
103
+ return ( long ) this . meterRegistry . get (metricsName ( "consumed" )). counter (). count ();
131
104
}
132
105
133
106
@ Override
134
107
public void start (String description ) throws Exception {
135
108
long startTime = System .nanoTime ();
136
109
137
- String metricPublished = "rabbitmqStreamPublished" + metricsSuffix ;
138
- String metricProducerConfirmed = "rabbitmqStreamProducer_confirmed" + metricsSuffix ;
139
- String metricConsumed = "rabbitmqStreamConsumed" + metricsSuffix ;
140
- String metricChunkSize = "rabbitmqStreamChunk_size" + metricsSuffix ;
141
- String metricLatency = "rabbitmqStreamLatency" + metricsSuffix ;
142
- String metricConfirmLatency = "rabbitmqStreamConfirm_latency" + metricsSuffix ;
143
- String metricWrittenBytes = "rabbitmqStreamWritten_bytes" + metricsSuffix ;
144
- String metricReadBytes = "rabbitmqStreamRead_bytes" + metricsSuffix ;
110
+ String metricPublished = metricsName ( "published" ) ;
111
+ String metricProducerConfirmed = metricsName ( "confirmed" ) ;
112
+ String metricConsumed = metricsName ( "consumed" ) ;
113
+ String metricChunkSize = metricsName ( "chunk_size" ) ;
114
+ String metricLatency = metricsName ( "latency" ) ;
115
+ String metricConfirmLatency = metricsName ( "confirm_latency" ) ;
116
+ String metricWrittenBytes = metricsName ( "written_bytes" ) ;
117
+ String metricReadBytes = metricsName ( "read_bytes" ) ;
145
118
146
119
Set <String > allMetrics =
147
120
new HashSet <>(
@@ -156,16 +129,16 @@ public void start(String description) throws Exception {
156
129
allMetrics .add (metricConfirmLatency );
157
130
}
158
131
159
- Map <String , String > metersNamesAndLabels = new LinkedHashMap <>();
160
- metersNamesAndLabels .put (metricPublished , "published" );
161
- metersNamesAndLabels .put (metricProducerConfirmed , "confirmed" );
162
- metersNamesAndLabels .put (metricConsumed , "consumed" );
132
+ Map <String , String > countersNamesAndLabels = new LinkedHashMap <>();
133
+ countersNamesAndLabels .put (metricPublished , "published" );
134
+ countersNamesAndLabels .put (metricProducerConfirmed , "confirmed" );
135
+ countersNamesAndLabels .put (metricConsumed , "consumed" );
163
136
164
137
if (this .includeByteRates ) {
165
138
allMetrics .add (metricWrittenBytes );
166
139
allMetrics .add (metricReadBytes );
167
- metersNamesAndLabels .put (metricWrittenBytes , "written bytes" );
168
- metersNamesAndLabels .put (metricReadBytes , "read bytes" );
140
+ countersNamesAndLabels .put (metricWrittenBytes , "written bytes" );
141
+ countersNamesAndLabels .put (metricReadBytes , "read bytes" );
169
142
}
170
143
171
144
ScheduledExecutorService scheduledExecutorService =
@@ -174,65 +147,60 @@ public void start(String description) throws Exception {
174
147
Closeable summaryFileClosingSequence =
175
148
maybeSetSummaryFile (description , allMetrics , scheduledExecutorService );
176
149
177
- SortedMap <String , Meter > registryMeters = metricRegistry .getMeters ();
178
-
179
- Map <String , Meter > meters = new LinkedHashMap <>(metersNamesAndLabels .size ());
180
- metersNamesAndLabels
150
+ Map <String , Counter > counters = new LinkedHashMap <>(countersNamesAndLabels .size ());
151
+ countersNamesAndLabels
181
152
.entrySet ()
182
- .forEach (entry -> meters .put (entry .getValue (), registryMeters .get (entry .getKey ())));
153
+ .forEach (
154
+ entry -> counters .put (entry .getValue (), meterRegistry .get (entry .getKey ()).counter ()));
183
155
184
- Map <String , FormatCallback > formatMeter = new HashMap <>();
185
- metersNamesAndLabels .entrySet ().stream ()
156
+ Map <String , FormatCallback > formatCounter = new HashMap <>();
157
+ countersNamesAndLabels .entrySet ().stream ()
186
158
.filter (entry -> !entry .getKey ().contains ("bytes" ))
187
159
.forEach (
188
160
entry -> {
189
- formatMeter .put (
161
+ formatCounter .put (
190
162
entry .getValue (),
191
163
(lastValue , currentValue , duration ) -> {
192
164
long rate = 1000 * (currentValue - lastValue ) / duration .toMillis ();
193
165
return String .format ("%s %d msg/s, " , entry .getValue (), rate );
194
166
});
195
167
});
196
168
197
- metersNamesAndLabels .entrySet ().stream ()
169
+ countersNamesAndLabels .entrySet ().stream ()
198
170
.filter (entry -> entry .getKey ().contains ("bytes" ))
199
171
.forEach (
200
172
entry -> {
201
- formatMeter .put (
173
+ formatCounter .put (
202
174
entry .getValue (),
203
175
(lastValue , currentValue , duration ) -> {
204
176
long rate = 1000 * (currentValue - lastValue ) / duration .toMillis ();
205
177
return formatByteRate (entry .getValue (), rate ) + ", " ;
206
178
});
207
179
});
208
180
209
- Histogram chunkSize = metricRegistry .getHistograms ().get (metricChunkSize );
210
- Function <Histogram , String > formatChunkSize =
211
- histogram -> String .format ("chunk size %.0f" , histogram .getSnapshot ().getMean ());
212
-
213
- com .codahale .metrics .Timer latency = metricRegistry .getTimers ().get (metricLatency );
214
- com .codahale .metrics .Timer confirmLatency =
215
- confirmLatency () ? metricRegistry .getTimers ().get (metricConfirmLatency ) : null ;
181
+ HistogramSupport chunkSize = meterRegistry .get (metricChunkSize ).summary ();
182
+ Function <HistogramSupport , String > formatChunkSize =
183
+ histogram -> String .format ("chunk size %.0f" , histogram .takeSnapshot ().mean ());
216
184
217
185
Function <Number , Number > convertDuration =
218
186
in -> in instanceof Long ? in .longValue () / 1_000_000 : in .doubleValue () / 1_000_000 ;
219
- BiFunction <String , com . codahale . metrics . Timer , String > formatLatency =
187
+ BiFunction <String , Timer , String > formatLatency =
220
188
(name , timer ) -> {
221
- Snapshot snapshot = timer .getSnapshot ();
189
+ HistogramSnapshot snapshot = timer .takeSnapshot ();
190
+
222
191
return String .format (
223
- name + " min/median/75th/95th/99th %.0f/%.0f/%.0f/%.0f/%.0f ms" ,
224
- convertDuration .apply (snapshot .getMin ()),
225
- convertDuration .apply (snapshot .getMedian ()),
226
- convertDuration .apply (snapshot .get75thPercentile ()),
227
- convertDuration .apply (snapshot .get95thPercentile ()),
228
- convertDuration .apply (snapshot .get99thPercentile ()));
192
+ name + " median/75th/95th/99th %.0f/%.0f/%.0f/%.0f ms" ,
193
+ convertDuration .apply (percentile (snapshot , 0.5 ).value ()),
194
+ convertDuration .apply (percentile (snapshot , 0.75 ).value ()),
195
+ convertDuration .apply (percentile (snapshot , 0.95 ).value ()),
196
+ convertDuration .apply (percentile (snapshot , 0.99 ).value ()));
229
197
};
230
198
231
199
AtomicInteger reportCount = new AtomicInteger (1 );
232
200
233
201
AtomicLong lastTick = new AtomicLong (startTime );
234
- Map <String , Long > lastMetersValues = new ConcurrentHashMap <>(meters .size ());
235
- meters .entrySet ().forEach (e -> lastMetersValues .put (e .getKey (), e .getValue ().getCount ()));
202
+ Map <String , Long > lastMetersValues = new ConcurrentHashMap <>(counters .size ());
203
+ counters .entrySet ().forEach (e -> lastMetersValues .put (e .getKey (), ( long ) e .getValue ().count ()));
236
204
237
205
ScheduledFuture <?> consoleReportingTask =
238
206
scheduledExecutorService .scheduleAtFixedRate (
@@ -244,16 +212,16 @@ public void start(String description) throws Exception {
244
212
lastTick .set (currentTime );
245
213
StringBuilder builder = new StringBuilder ();
246
214
builder .append (reportCount .get ()).append (", " );
247
- meters
215
+ counters
248
216
.entrySet ()
249
217
.forEach (
250
218
entry -> {
251
219
String meterName = entry .getKey ();
252
- Meter meter = entry .getValue ();
220
+ Counter counter = entry .getValue ();
253
221
long lastValue = lastMetersValues .get (meterName );
254
- long currentValue = meter . getCount ();
222
+ long currentValue = ( long ) counter . count ();
255
223
builder .append (
256
- formatMeter
224
+ formatCounter
257
225
.get (meterName )
258
226
.compute (lastValue , currentValue , duration ));
259
227
lastMetersValues .put (meterName , currentValue );
@@ -291,27 +259,28 @@ public void start(String description) throws Exception {
291
259
Duration d = Duration .ofNanos (System .nanoTime () - startTime );
292
260
Duration duration = d .getSeconds () <= 0 ? Duration .ofSeconds (1 ) : d ;
293
261
294
- Function <Map . Entry <String , Meter >, String > formatMeterSummary =
262
+ Function <Entry <String , Counter >, String > formatMeterSummary =
295
263
entry -> {
296
264
if (entry .getKey ().contains ("bytes" )) {
297
265
return formatByteRate (
298
- entry .getKey (), 1000 * entry .getValue ().getCount () / duration .toMillis ())
266
+ entry .getKey (),
267
+ 1000 * (long ) entry .getValue ().count () / duration .toMillis ())
299
268
+ ", " ;
300
269
} else {
301
270
return String .format (
302
271
"%s %d msg/s, " ,
303
- entry .getKey (), 1000 * entry .getValue ().getCount () / duration .toMillis ());
272
+ entry .getKey (), 1000 * ( long ) entry .getValue ().count () / duration .toMillis ());
304
273
}
305
274
};
306
275
307
- BiFunction <String , com . codahale . metrics . Timer , String > formatLatencySummary =
276
+ BiFunction <String , HistogramSupport , String > formatLatencySummary =
308
277
(name , histogram ) ->
309
278
String .format (
310
279
name + " 95th %.0f ms" ,
311
- convertDuration .apply (histogram .getSnapshot (). get95thPercentile ()));
280
+ convertDuration .apply (percentile ( histogram .takeSnapshot (), 0.95 ). value ()));
312
281
313
282
StringBuilder builder = new StringBuilder ("Summary: " );
314
- meters .entrySet ().forEach (entry -> builder .append (formatMeterSummary .apply (entry )));
283
+ counters .entrySet ().forEach (entry -> builder .append (formatMeterSummary .apply (entry )));
315
284
if (confirmLatency ()) {
316
285
builder
317
286
.append (formatLatencySummary .apply ("confirm latency" , confirmLatency ))
@@ -360,8 +329,19 @@ private Closeable maybeSetSummaryFile(
360
329
printStream .println (description );
361
330
}
362
331
332
+ DropwizardMeterRegistry dropwizardMeterRegistry =
333
+ this .meterRegistry .getRegistries ().stream ()
334
+ .filter (r -> r instanceof DropwizardMeterRegistry )
335
+ .map (r -> (DropwizardMeterRegistry ) r )
336
+ .findAny ()
337
+ .orElseGet (() -> Utils .dropwizardMeterRegistry ());
338
+
339
+ if (!this .meterRegistry .getRegistries ().contains (dropwizardMeterRegistry )) {
340
+ this .meterRegistry .add (dropwizardMeterRegistry );
341
+ }
342
+
363
343
ConsoleReporter fileReporter =
364
- ConsoleReporter .forRegistry (metricRegistry )
344
+ ConsoleReporter .forRegistry (dropwizardMeterRegistry . getDropwizardRegistry () )
365
345
.filter ((name , metric ) -> allMetrics .contains (name ))
366
346
.convertRatesTo (TimeUnit .SECONDS )
367
347
.convertDurationsTo (TimeUnit .MILLISECONDS )
@@ -431,4 +411,17 @@ private interface FormatCallback {
431
411
432
412
String compute (long lastValue , long currentValue , Duration duration );
433
413
}
414
+
415
+ private String metricsName (String name ) {
416
+ return this .metricsPrefix + "." + name ;
417
+ }
418
+
419
+ private static ValueAtPercentile percentile (HistogramSnapshot snapshot , double expected ) {
420
+ for (ValueAtPercentile percentile : snapshot .percentileValues ()) {
421
+ if (percentile .percentile () == expected ) {
422
+ return percentile ;
423
+ }
424
+ }
425
+ return null ;
426
+ }
434
427
}
0 commit comments