Skip to content

Commit 04f6d9c

Browse files
authored
Fix delta metric storage concurrency bug (#5932)
1 parent 83993e0 commit 04f6d9c

File tree

3 files changed

+149
-9
lines changed

3 files changed

+149
-9
lines changed

sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/TestSdk.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ Meter build() {
3838
.get("io.opentelemetry.sdk.metrics");
3939
}
4040
}),
41-
SDK(
41+
SDK_CUMULATIVE(
4242
new SdkBuilder() {
4343
@Override
4444
Meter build() {
@@ -50,6 +50,19 @@ Meter build() {
5050
.build()
5151
.get("io.opentelemetry.sdk.metrics");
5252
}
53+
}),
54+
SDK_DELTA(
55+
new SdkBuilder() {
56+
@Override
57+
Meter build() {
58+
return SdkMeterProvider.builder()
59+
.setClock(Clock.getDefault())
60+
.setResource(Resource.empty())
61+
// Must register reader for real SDK.
62+
.registerMetricReader(InMemoryMetricReader.createDelta())
63+
.build()
64+
.get("io.opentelemetry.sdk.metrics");
65+
}
5366
});
5467

5568
private final SdkBuilder sdkBuilder;

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java

+47-8
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
import java.util.Queue;
2727
import java.util.concurrent.ConcurrentHashMap;
2828
import java.util.concurrent.ConcurrentLinkedQueue;
29+
import java.util.concurrent.locks.Lock;
30+
import java.util.concurrent.locks.ReadWriteLock;
31+
import java.util.concurrent.locks.ReentrantReadWriteLock;
2932
import java.util.logging.Level;
3033
import java.util.logging.Logger;
3134

@@ -46,8 +49,7 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
4649
private final MetricDescriptor metricDescriptor;
4750
private final AggregationTemporality aggregationTemporality;
4851
private final Aggregator<T, U> aggregator;
49-
private final ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles =
50-
new ConcurrentHashMap<>();
52+
private volatile AggregatorHolder<T, U> aggregatorHolder = new AggregatorHolder<>();
5153
private final AttributesProcessor attributesProcessor;
5254

5355
/**
@@ -83,8 +85,15 @@ Queue<AggregatorHandle<T, U>> getAggregatorHandlePool() {
8385

8486
@Override
8587
public void recordLong(long value, Attributes attributes, Context context) {
86-
AggregatorHandle<T, U> handle = getAggregatorHandle(attributes, context);
87-
handle.recordLong(value, attributes, context);
88+
Lock readLock = aggregatorHolder.lock.readLock();
89+
readLock.lock();
90+
try {
91+
AggregatorHandle<T, U> handle =
92+
getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context);
93+
handle.recordLong(value, attributes, context);
94+
} finally {
95+
readLock.unlock();
96+
}
8897
}
8998

9099
@Override
@@ -99,11 +108,21 @@ public void recordDouble(double value, Attributes attributes, Context context) {
99108
+ ". Dropping measurement.");
100109
return;
101110
}
102-
AggregatorHandle<T, U> handle = getAggregatorHandle(attributes, context);
103-
handle.recordDouble(value, attributes, context);
111+
Lock readLock = aggregatorHolder.lock.readLock();
112+
readLock.lock();
113+
try {
114+
AggregatorHandle<T, U> handle =
115+
getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context);
116+
handle.recordDouble(value, attributes, context);
117+
} finally {
118+
readLock.unlock();
119+
}
104120
}
105121

106-
private AggregatorHandle<T, U> getAggregatorHandle(Attributes attributes, Context context) {
122+
private AggregatorHandle<T, U> getAggregatorHandle(
123+
ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles,
124+
Attributes attributes,
125+
Context context) {
107126
Objects.requireNonNull(attributes, "attributes");
108127
attributes = attributesProcessor.process(attributes, context);
109128
AggregatorHandle<T, U> handle = aggregatorHandles.get(attributes);
@@ -146,13 +165,27 @@ public MetricData collect(
146165
? registeredReader.getLastCollectEpochNanos()
147166
: startEpochNanos;
148167

168+
ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles;
169+
if (reset) {
170+
AggregatorHolder<T, U> holder = this.aggregatorHolder;
171+
this.aggregatorHolder = new AggregatorHolder<>();
172+
Lock writeLock = holder.lock.writeLock();
173+
writeLock.lock();
174+
try {
175+
aggregatorHandles = holder.aggregatorHandles;
176+
} finally {
177+
writeLock.unlock();
178+
}
179+
} else {
180+
aggregatorHandles = this.aggregatorHolder.aggregatorHandles;
181+
}
182+
149183
// Grab aggregated points.
150184
List<T> points = new ArrayList<>(aggregatorHandles.size());
151185
aggregatorHandles.forEach(
152186
(attributes, handle) -> {
153187
T point = handle.aggregateThenMaybeReset(start, epochNanos, attributes, reset);
154188
if (reset) {
155-
aggregatorHandles.remove(attributes, handle);
156189
// Return the aggregator to the pool.
157190
aggregatorHandlePool.offer(handle);
158191
}
@@ -180,4 +213,10 @@ public MetricData collect(
180213
public MetricDescriptor getMetricDescriptor() {
181214
return metricDescriptor;
182215
}
216+
217+
private static class AggregatorHolder<T extends PointData, U extends ExemplarData> {
218+
private final ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles =
219+
new ConcurrentHashMap<>();
220+
private final ReadWriteLock lock = new ReentrantReadWriteLock();
221+
}
183222
}

sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java

+88
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import static org.mockito.Mockito.times;
1313
import static org.mockito.Mockito.verify;
1414

15+
import com.google.common.util.concurrent.AtomicDouble;
16+
import com.google.common.util.concurrent.Uninterruptibles;
1517
import io.github.netmikey.logunit.api.LogCapturer;
1618
import io.opentelemetry.api.common.AttributeKey;
1719
import io.opentelemetry.api.common.Attributes;
@@ -21,9 +23,11 @@
2123
import io.opentelemetry.sdk.metrics.Aggregation;
2224
import io.opentelemetry.sdk.metrics.InstrumentType;
2325
import io.opentelemetry.sdk.metrics.InstrumentValueType;
26+
import io.opentelemetry.sdk.metrics.data.ExemplarData;
2427
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
2528
import io.opentelemetry.sdk.metrics.data.LongPointData;
2629
import io.opentelemetry.sdk.metrics.data.MetricData;
30+
import io.opentelemetry.sdk.metrics.data.PointData;
2731
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
2832
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
2933
import io.opentelemetry.sdk.metrics.internal.aggregator.EmptyMetricData;
@@ -37,8 +41,17 @@
3741
import io.opentelemetry.sdk.resources.Resource;
3842
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
3943
import io.opentelemetry.sdk.testing.time.TestClock;
44+
import java.time.Duration;
45+
import java.util.ArrayList;
46+
import java.util.List;
47+
import java.util.concurrent.CountDownLatch;
48+
import java.util.function.BiConsumer;
49+
import java.util.stream.Stream;
4050
import org.junit.jupiter.api.Test;
4151
import org.junit.jupiter.api.extension.RegisterExtension;
52+
import org.junit.jupiter.params.ParameterizedTest;
53+
import org.junit.jupiter.params.provider.Arguments;
54+
import org.junit.jupiter.params.provider.MethodSource;
4255
import org.slf4j.event.Level;
4356

4457
@SuppressLogger(DefaultSynchronousMetricStorage.class)
@@ -370,4 +383,79 @@ void recordAndCollect_DeltaAtLimit() {
370383
assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT);
371384
logs.assertContains("Instrument name has exceeded the maximum allowed cardinality");
372385
}
386+
387+
@ParameterizedTest
388+
@MethodSource("concurrentStressTestArguments")
389+
void recordAndCollect_concurrentStressTest(
390+
DefaultSynchronousMetricStorage<?, ?> storage, BiConsumer<Double, AtomicDouble> collect) {
391+
// Define record threads. Each records a value of 1.0, 2000 times
392+
List<Thread> threads = new ArrayList<>();
393+
CountDownLatch latch = new CountDownLatch(4);
394+
for (int i = 0; i < 4; i++) {
395+
Thread thread =
396+
new Thread(
397+
() -> {
398+
for (int j = 0; j < 2000; j++) {
399+
storage.recordDouble(1.0, Attributes.empty(), Context.current());
400+
Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1));
401+
}
402+
latch.countDown();
403+
});
404+
threads.add(thread);
405+
}
406+
407+
// Define collect thread. Collect thread collects and aggregates the
408+
AtomicDouble cumulativeSum = new AtomicDouble();
409+
Thread collectThread =
410+
new Thread(
411+
() -> {
412+
do {
413+
Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1));
414+
MetricData metricData =
415+
storage.collect(Resource.empty(), InstrumentationScopeInfo.empty(), 0, 1);
416+
if (metricData.isEmpty()) {
417+
continue;
418+
}
419+
metricData.getDoubleSumData().getPoints().stream()
420+
.findFirst()
421+
.ifPresent(pointData -> collect.accept(pointData.getValue(), cumulativeSum));
422+
} while (latch.getCount() != 0);
423+
});
424+
425+
// Start all the threads
426+
collectThread.start();
427+
threads.forEach(Thread::start);
428+
429+
// Wait for the collect thread to end, which collects until the record threads are done
430+
Uninterruptibles.joinUninterruptibly(collectThread);
431+
432+
assertThat(cumulativeSum.get()).isEqualTo(8000.0);
433+
}
434+
435+
private static Stream<Arguments> concurrentStressTestArguments() {
436+
Aggregator<PointData, ExemplarData> aggregator =
437+
((AggregatorFactory) Aggregation.sum())
438+
.createAggregator(DESCRIPTOR, ExemplarFilter.alwaysOff());
439+
return Stream.of(
440+
Arguments.of(
441+
// Delta
442+
new DefaultSynchronousMetricStorage<>(
443+
RegisteredReader.create(InMemoryMetricReader.createDelta(), ViewRegistry.create()),
444+
METRIC_DESCRIPTOR,
445+
aggregator,
446+
AttributesProcessor.noop(),
447+
CARDINALITY_LIMIT),
448+
(BiConsumer<Double, AtomicDouble>)
449+
(value, cumulativeCount) -> cumulativeCount.addAndGet(value)),
450+
Arguments.of(
451+
// Cumulative
452+
new DefaultSynchronousMetricStorage<>(
453+
RegisteredReader.create(InMemoryMetricReader.create(), ViewRegistry.create()),
454+
METRIC_DESCRIPTOR,
455+
aggregator,
456+
AttributesProcessor.noop(),
457+
CARDINALITY_LIMIT),
458+
(BiConsumer<Double, AtomicDouble>)
459+
(value, cumulativeCount) -> cumulativeCount.set(value)));
460+
}
373461
}

0 commit comments

Comments
 (0)