Skip to content

Commit 2735d26

Browse files
Avoid duplicate publish on StepMeterRegistry when closed within first step. (#4485)
Do not perform a publish for the previous step if there has not been a previous step yet. This is achieved by internally tracking when polling of meters happens (to rollover values) and it will skip doing the publish for the previous step if they have never been rolled over. Resolves gh-4357
1 parent 16f59e9 commit 2735d26

File tree

4 files changed

+81
-29
lines changed

4 files changed

+81
-29
lines changed

implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpMeterRegistry.java

+14-5
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ public class OtlpMeterRegistry extends PushMeterRegistry {
8787

8888
private long deltaAggregationTimeUnixNano = 0L;
8989

90+
// Time when the last scheduled rollOver has started. Applicable only for delta
91+
// flavour.
92+
private long lastMeterRolloverStartTime = -1;
93+
9094
@Nullable
9195
private ScheduledExecutorService meterPollingService;
9296

@@ -244,8 +248,8 @@ protected DistributionStatisticConfig defaultHistogramConfig() {
244248
public void close() {
245249
stop();
246250
if (config.enabled() && isDelta() && !isClosed()) {
247-
if (!isDataPublishedForCurrentStep() && !isPublishing()) {
248-
// Data was not published for the current step. So, we should flush that
251+
if (shouldPublishDataForLastStep() && !isPublishing()) {
252+
// Data was not published for the last step. So, we should flush that
249253
// first.
250254
try {
251255
publish();
@@ -264,9 +268,13 @@ else if (isPublishing()) {
264268
super.close();
265269
}
266270

267-
private boolean isDataPublishedForCurrentStep() {
268-
return (getLastScheduledPublishStartTime() / config.step().toMillis()) == (clock.wallTime()
269-
/ config.step().toMillis());
271+
private boolean shouldPublishDataForLastStep() {
272+
if (lastMeterRolloverStartTime < 0)
273+
return false;
274+
275+
final long lastPublishedStep = getLastScheduledPublishStartTime() / config.step().toMillis();
276+
final long lastPolledStep = lastMeterRolloverStartTime / config.step().toMillis();
277+
return lastPublishedStep < lastPolledStep;
270278
}
271279

272280
// Either we do this or make StepMeter public
@@ -343,6 +351,7 @@ private Metric writeSum(Meter meter, DoubleSupplier count) {
343351
*/
344352
// VisibleForTesting
345353
void pollMetersToRollover() {
354+
this.lastMeterRolloverStartTime = clock.wallTime();
346355
this.getMeters()
347356
.forEach(m -> m.match(gauge -> null, Counter::count, Timer::takeSnapshot, DistributionSummary::takeSnapshot,
348357
meter -> null, meter -> null, FunctionCounter::count, FunctionTimer::count, meter -> null));

implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/OtlpDeltaMeterRegistryTest.java

+24-2
Original file line numberDiff line numberDiff line change
@@ -726,6 +726,20 @@ void whenCloseDuringScheduledPublish_thenPreviousStepAndCurrentPartialStepArePub
726726
assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(24);
727727
}
728728

729+
@Test
730+
@Issue("#4357")
731+
void publishOnceWhenClosedWithinFirstStep() {
732+
// Set the initial clock time to a valid time.
733+
MockClock mockClock = new MockClock();
734+
mockClock.add(otlpConfig().step().multipliedBy(5));
735+
736+
TestOtlpMeterRegistry stepMeterRegistry = new TestOtlpMeterRegistry(otlpConfig(), mockClock);
737+
738+
assertThat(stepMeterRegistry.publishCount.get()).isZero();
739+
stepMeterRegistry.close();
740+
assertThat(stepMeterRegistry.publishCount.get()).isEqualTo(1);
741+
}
742+
729743
private void assertEmptyHistogramSnapshot(HistogramSnapshot snapshot) {
730744
assertThat(snapshot.count()).isZero();
731745
assertThat(snapshot.total()).isZero();
@@ -771,6 +785,8 @@ private void assertHistogramContains(HistogramSnapshot snapshot, double total, d
771785

772786
private class TestOtlpMeterRegistry extends OtlpMeterRegistry {
773787

788+
private final AtomicInteger publishCount = new AtomicInteger();
789+
774790
Deque<Double> publishedCounterCounts = new ArrayDeque<>();
775791

776792
Deque<Long> publishedTimerCounts = new ArrayDeque<>();
@@ -795,18 +811,24 @@ private class TestOtlpMeterRegistry extends OtlpMeterRegistry {
795811

796812
Deque<Double> publishedFunctionTimerTotals = new ArrayDeque<>();
797813

798-
private long lastScheduledPublishStartTime = 0L;
814+
private long lastScheduledPublishStartTime;
799815

800816
AtomicBoolean isPublishing = new AtomicBoolean(false);
801817

802818
CompletableFuture<Void> scheduledPublishingFuture = CompletableFuture.completedFuture(null);
803819

804820
TestOtlpMeterRegistry() {
805-
super(otlpConfig(), OtlpDeltaMeterRegistryTest.this.clock);
821+
this(otlpConfig(), OtlpDeltaMeterRegistryTest.this.clock);
822+
}
823+
824+
TestOtlpMeterRegistry(OtlpConfig otlpConfig, Clock clock) {
825+
super(otlpConfig, clock);
826+
this.lastScheduledPublishStartTime = super.getLastScheduledPublishStartTime();
806827
}
807828

808829
@Override
809830
protected void publish() {
831+
publishCount.incrementAndGet();
810832
forEachMeter(meter -> meter.match(null, this::publishCounter, this::publishTimer, this::publishSummary,
811833
null, null, this::publishFunctionCounter, this::publishFunctionTimer, null));
812834
}

micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepMeterRegistry.java

+14-6
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ public abstract class StepMeterRegistry extends PushMeterRegistry {
4949
@Nullable
5050
private ScheduledExecutorService meterPollingService;
5151

52+
// Time when the last scheduled rollOver has started.
53+
private long lastMeterRolloverStartTime = -1;
54+
5255
public StepMeterRegistry(StepRegistryConfig config, Clock clock) {
5356
super(config, clock);
5457
this.config = config;
@@ -139,9 +142,9 @@ public void close() {
139142
stop();
140143

141144
if (config.enabled() && !isClosed()) {
142-
if (!isDataPublishedForCurrentStep() && !isPublishing()) {
143-
// Data was not published for the current step. So, we should flush that
144-
// first.
145+
if (shouldPublishDataForLastStep() && !isPublishing()) {
146+
// Data was not published for the last completed step. So, we should flush
147+
// that first.
145148
try {
146149
publish();
147150
}
@@ -159,9 +162,13 @@ else if (isPublishing()) {
159162
super.close();
160163
}
161164

162-
private boolean isDataPublishedForCurrentStep() {
163-
return (getLastScheduledPublishStartTime() / config.step().toMillis()) == (clock.wallTime()
164-
/ config.step().toMillis());
165+
private boolean shouldPublishDataForLastStep() {
166+
if (lastMeterRolloverStartTime < 0)
167+
return false;
168+
169+
final long lastPublishedStep = getLastScheduledPublishStartTime() / config.step().toMillis();
170+
final long lastPolledStep = lastMeterRolloverStartTime / config.step().toMillis();
171+
return lastPublishedStep < lastPolledStep;
165172
}
166173

167174
/**
@@ -181,6 +188,7 @@ private void closingRolloverStepMeters() {
181188
*/
182189
// VisibleForTesting
183190
void pollMetersToRollover() {
191+
this.lastMeterRolloverStartTime = clock.wallTime();
184192
this.getMeters()
185193
.forEach(m -> m.match(gauge -> null, Counter::count, Timer::count, DistributionSummary::count,
186194
meter -> null, meter -> null, FunctionCounter::count, FunctionTimer::count, meter -> null));

micrometer-core/src/test/java/io/micrometer/core/instrument/step/StepMeterRegistryTest.java

+29-16
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@
5050
*/
5151
class StepMeterRegistryTest {
5252

53-
private final AtomicInteger publishes = new AtomicInteger();
54-
5553
private final MockClock clock = new MockClock();
5654

5755
private final StepRegistryConfig config = new StepRegistryConfig() {
@@ -98,9 +96,9 @@ void serviceLevelObjectivesOnlyNoPercentileHistogram() {
9896
@Issue("#484")
9997
@Test
10098
void publishOneLastTimeOnClose() {
101-
assertThat(publishes.get()).isEqualTo(0);
99+
assertThat(registry.publishCount.get()).isZero();
102100
registry.close();
103-
assertThat(publishes.get()).isEqualTo(1);
101+
assertThat(registry.publishCount.get()).isEqualTo(1);
104102
}
105103

106104
@Issue("#1993")
@@ -425,7 +423,7 @@ void scheduledRollOver() {
425423
}
426424

427425
@Test
428-
@Issue("3914")
426+
@Issue("#3914")
429427
void publishShouldNotHappenWhenRegistryIsDisabled() {
430428
StepRegistryConfig disabledStepRegistryConfig = new StepRegistryConfig() {
431429
@Override
@@ -449,27 +447,26 @@ public String get(String key) {
449447
Counter.builder("publish_disabled_counter").register(disabledStepMeterRegistry).increment();
450448

451449
clock.add(config.step());
452-
assertThat(publishes.get()).isZero();
450+
assertThat(disabledStepMeterRegistry.publishCount.get()).isZero();
453451
disabledStepMeterRegistry.close();
454-
assertThat(publishes.get()).isZero();
452+
assertThat(disabledStepMeterRegistry.publishCount.get()).isZero();
455453
}
456454

457455
@Test
458-
@Issue("3914")
456+
@Issue("#3914")
459457
void publishShouldNotHappenWhenRegistryIsClosed() {
460458
Counter.builder("my.counter").register(registry).increment();
461459

462460
clock.add(config.step());
463-
assertThat(publishes.get()).isZero();
461+
assertThat(registry.publishCount.get()).isZero();
464462
registry.close();
465-
assertThat(publishes.get()).isEqualTo(2);
466-
assertThat(registry.publishedCounterCounts).hasSize(2);
467-
assertThat(registry.publishedCounterCounts.getFirst()).isOne();
468-
assertThat(registry.publishedCounterCounts.getLast()).isZero();
463+
assertThat(registry.publishCount.get()).isEqualTo(1);
464+
assertThat(registry.publishedCounterCounts).hasSize(1);
469465

470466
clock.add(config.step());
471467
registry.close();
472-
assertThat(publishes.get()).isEqualTo(2);
468+
assertThat(registry.publishCount.get()).isEqualTo(1);
469+
assertThat(registry.publishedCounterCounts).hasSize(1);
473470
}
474471

475472
@Test
@@ -557,8 +554,23 @@ void whenCloseDuringScheduledPublish_thenPreviousStepAndCurrentPartialStepArePub
557554
assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(24);
558555
}
559556

557+
@Test
558+
@Issue("#4357")
559+
void publishOnceWhenClosedWithinFirstStep() {
560+
// Set the initial clock time to a valid time.
561+
MockClock mockClock = new MockClock();
562+
mockClock.add(config.step().multipliedBy(5));
563+
564+
MyStepMeterRegistry stepMeterRegistry = new MyStepMeterRegistry(config, mockClock);
565+
assertThat(stepMeterRegistry.publishCount.get()).isZero();
566+
stepMeterRegistry.close();
567+
assertThat(stepMeterRegistry.publishCount.get()).isEqualTo(1);
568+
}
569+
560570
private class MyStepMeterRegistry extends StepMeterRegistry {
561571

572+
private final AtomicInteger publishCount = new AtomicInteger();
573+
562574
Deque<Double> publishedCounterCounts = new ArrayDeque<>();
563575

564576
Deque<Long> publishedTimerCounts = new ArrayDeque<>();
@@ -575,7 +587,7 @@ private class MyStepMeterRegistry extends StepMeterRegistry {
575587

576588
Deque<Double> publishedFunctionTimerTotals = new ArrayDeque<>();
577589

578-
private long lastScheduledPublishStartTime = 0L;
590+
private long lastScheduledPublishStartTime;
579591

580592
@Nullable
581593
Runnable prePublishAction;
@@ -590,6 +602,7 @@ private class MyStepMeterRegistry extends StepMeterRegistry {
590602

591603
MyStepMeterRegistry(StepRegistryConfig config, Clock clock) {
592604
super(config, clock);
605+
this.lastScheduledPublishStartTime = super.getLastScheduledPublishStartTime();
593606
}
594607

595608
void setPrePublishAction(Runnable prePublishAction) {
@@ -601,7 +614,7 @@ protected void publish() {
601614
if (prePublishAction != null) {
602615
prePublishAction.run();
603616
}
604-
publishes.incrementAndGet();
617+
publishCount.incrementAndGet();
605618
getMeters().stream()
606619
.map(meter -> meter.match(g -> null, this::publishCounter, this::publishTimer, this::publishSummary,
607620
null, tg -> null, this::publishFunctionCounter, this::publishFunctionTimer, m -> null))

0 commit comments

Comments
 (0)