From 47fecb5f83e9497c1718c8bde9eced8c528805ec Mon Sep 17 00:00:00 2001 From: Stephen-Bao Date: Thu, 14 Jul 2022 17:41:57 -0400 Subject: [PATCH 01/14] Introduced changes to make the library thread-safe. --- build.gradle | 7 ++- gradle/wrapper/gradle-wrapper.properties | 2 +- .../emf/logger/MetricsLogger.java | 63 ++++++++++++++----- .../cloudwatchlogs/emf/model/Metadata.java | 4 +- .../emf/model/MetricDirective.java | 21 ++++--- .../cloudwatchlogs/emf/model/RootNode.java | 3 +- .../emf/model/MetricsContextTest.java | 10 +++ 7 files changed, 82 insertions(+), 28 deletions(-) diff --git a/build.gradle b/build.gradle index b0d65d66..a9b4bbae 100644 --- a/build.gradle +++ b/build.gradle @@ -18,6 +18,7 @@ plugins { id 'com.diffplug.spotless' version '5.8.2' id 'maven-publish' id 'signing' + id "me.champeau.jmh" version "0.6.6" } group "software.amazon.cloudwatchlogs" @@ -78,6 +79,10 @@ dependencies { testImplementation 'software.amazon.awssdk:cloudwatch:2.13.54' testCompileOnly 'org.projectlombok:lombok:1.18.12' testAnnotationProcessor 'org.projectlombok:lombok:1.18.12' + + implementation 'org.openjdk.jmh:jmh-core:1.29' + implementation 'org.openjdk.jmh:jmh-generator-annprocess:1.29' + jmhAnnotationProcessor 'org.openjdk.jmh:jmh-generator-annprocess:1.29' } spotless { @@ -124,7 +129,7 @@ tasks.withType(JavaCompile) { } tasks.named('wrapper') { - gradleVersion = '6.5.1' + gradleVersion = '7.4.2' distributionType = Wrapper.DistributionType.ALL } diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index ac33e994..92f06b50 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-6.5.1-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-all.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java b/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java index 13c5a48b..9e8e3d95 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java @@ -18,6 +18,7 @@ import java.time.Instant; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.ReentrantReadWriteLock; import lombok.extern.slf4j.Slf4j; import software.amazon.cloudwatchlogs.emf.environment.Environment; import software.amazon.cloudwatchlogs.emf.environment.EnvironmentProvider; @@ -35,6 +36,7 @@ public class MetricsLogger { private MetricsContext context; private CompletableFuture environmentFuture; private EnvironmentProvider environmentProvider; + private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); public MetricsLogger() { this(new EnvironmentProvider()); @@ -67,10 +69,16 @@ public void flush() { log.info("Failed to resolve environment. Fallback to default environment: ", ex); environment = environmentProvider.getDefaultEnvironment(); } - ISink sink = environment.getSink(); - configureContextForEnvironment(context, environment); - sink.accept(context); - context = context.createCopyWithContext(); + + rwl.writeLock().lock(); + try { + ISink sink = environment.getSink(); + configureContextForEnvironment(context, environment); + sink.accept(context); + context = context.createCopyWithContext(); + } finally { + rwl.writeLock().unlock(); + } } /** @@ -83,8 +91,13 @@ public void flush() { * @return the current logger */ public MetricsLogger putProperty(String key, Object value) { - this.context.putProperty(key, value); - return this; + rwl.readLock().lock(); + try { + this.context.putProperty(key, value); + return this; + } finally { + rwl.readLock().unlock(); + } } /** @@ -99,8 +112,13 @@ public MetricsLogger putProperty(String key, Object value) { * @return the current logger */ public MetricsLogger putDimensions(DimensionSet dimensions) { - context.putDimension(dimensions); - return this; + rwl.readLock().lock(); + try { + context.putDimension(dimensions); + return this; + } finally { + rwl.readLock().unlock(); + } } /** @@ -113,8 +131,13 @@ public MetricsLogger putDimensions(DimensionSet dimensions) { * @return the current logger */ public MetricsLogger setDimensions(DimensionSet... dimensionSets) { - context.setDimensions(dimensionSets); - return this; + rwl.readLock().lock(); + try { + context.setDimensions(dimensionSets); + return this; + } finally { + rwl.readLock().unlock(); + } } /** @@ -128,8 +151,13 @@ public MetricsLogger setDimensions(DimensionSet... dimensionSets) { * @return the current logger */ public MetricsLogger putMetric(String key, double value, Unit unit) { - this.context.putMetric(key, value, unit); - return this; + rwl.readLock().lock(); + try { + this.context.putMetric(key, value, unit); + return this; + } finally { + rwl.readLock().unlock(); + } } /** @@ -142,7 +170,7 @@ public MetricsLogger putMetric(String key, double value, Unit unit) { * @return the current logger */ public MetricsLogger putMetric(String key, double value) { - this.context.putMetric(key, value, Unit.NONE); + this.putMetric(key, value, Unit.NONE); return this; } @@ -157,8 +185,13 @@ public MetricsLogger putMetric(String key, double value) { * @return the current logger */ public MetricsLogger putMetadata(String key, Object value) { - this.context.putMetadata(key, value); - return this; + rwl.readLock().lock(); + try { + this.context.putMetadata(key, value); + return this; + } finally { + rwl.readLock().unlock(); + } } /** diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/model/Metadata.java b/src/main/java/software/amazon/cloudwatchlogs/emf/model/Metadata.java index 638850e5..3ebe48ae 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/model/Metadata.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/model/Metadata.java @@ -23,9 +23,9 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize; import java.time.Instant; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Setter; @@ -56,7 +56,7 @@ class Metadata { Metadata() { cloudWatchMetrics = new ArrayList<>(); timestamp = Instant.now(); - customFields = new HashMap<>(); + customFields = new ConcurrentHashMap<>(); } /** diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/model/MetricDirective.java b/src/main/java/software/amazon/cloudwatchlogs/emf/model/MetricDirective.java index 8d52094a..100faa41 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/model/MetricDirective.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/model/MetricDirective.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import lombok.*; @@ -45,8 +46,8 @@ class MetricDirective { MetricDirective() { namespace = "aws-embedded-metrics"; - metrics = new HashMap<>(); - dimensions = new ArrayList<>(); + metrics = new ConcurrentHashMap<>(); + dimensions = Collections.synchronizedList(new ArrayList<>()); defaultDimensions = new DimensionSet(); shouldUseDefaultDimension = true; } @@ -60,11 +61,15 @@ void putMetric(String key, double value) { } void putMetric(String key, double value, Unit unit) { - if (metrics.containsKey(key)) { - metrics.get(key).addValue(value); - } else { - metrics.put(key, new MetricDefinition(key, unit, value)); - } + metrics.compute( + key, + (k, v) -> { + if (v == null) return new MetricDefinition(key, unit, value); + else { + v.addValue(value); + return v; + } + }); } @JsonProperty("Metrics") @@ -86,7 +91,7 @@ List> getAllDimensionKeys() { */ void setDimensions(List dimensionSets) { shouldUseDefaultDimension = false; - dimensions = new ArrayList<>(dimensionSets); + dimensions = Collections.synchronizedList(new ArrayList<>(dimensionSets)); } /** diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/model/RootNode.java b/src/main/java/software/amazon/cloudwatchlogs/emf/model/RootNode.java index 50d74e70..fb790322 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/model/RootNode.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/model/RootNode.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.With; @@ -45,7 +46,7 @@ class RootNode { RootNode() { aws = new Metadata(); - properties = new HashMap<>(); + properties = new ConcurrentHashMap<>(); objectMapper.setFilterProvider(filterProvider); } diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextTest.java index 703a87c5..b9dee458 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextTest.java @@ -168,6 +168,16 @@ public void testSetTimestamp() throws JsonProcessingException { assertEquals(metadata.get("Timestamp"), now.toEpochMilli()); } + @Test + public void testPutMetadata() { + MetricsContext mc = new MetricsContext(); + mc.putMetadata("Metadata", "MetadataValue"); + + Map customFields = mc.getRootNode().getAws().getCustomMetadata(); + assertEquals(customFields.size(), 1); + assertEquals(customFields.get("Metadata"), "MetadataValue"); + } + @SuppressWarnings("unchecked") private ArrayList parseMetrics(String event) throws JsonProcessingException { Map rootNode = parseRootNode(event); From a0f0faede9f4809b5dfcab0fc0adc63072b72ee2 Mon Sep 17 00:00:00 2001 From: Stephen-Bao Date: Thu, 14 Jul 2022 18:43:31 -0400 Subject: [PATCH 02/14] Added thread-safety tests for the library. --- .../logger/MetricsLoggerThreadSafetyTest.java | 406 ++++++++++++++++++ .../MetricDirectiveThreadSafetyTest.java | 75 ++++ .../model/MetricsContextThreadSafetyTest.java | 45 ++ .../emf/sinks/GroupedSinkShunt.java | 57 +++ 4 files changed, 583 insertions(+) create mode 100644 src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java create mode 100644 src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java create mode 100644 src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextThreadSafetyTest.java create mode 100644 src/test/java/software/amazon/cloudwatchlogs/emf/sinks/GroupedSinkShunt.java diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java new file mode 100644 index 00000000..69c87e45 --- /dev/null +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java @@ -0,0 +1,406 @@ +package software.amazon.cloudwatchlogs.emf.logger; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.json.JsonMapper; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NonNull; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import software.amazon.cloudwatchlogs.emf.environment.Environment; +import software.amazon.cloudwatchlogs.emf.environment.EnvironmentProvider; +import software.amazon.cloudwatchlogs.emf.model.DimensionSet; +import software.amazon.cloudwatchlogs.emf.model.MetricsContext; +import software.amazon.cloudwatchlogs.emf.model.Unit; +import software.amazon.cloudwatchlogs.emf.serializers.UnitDeserializer; +import software.amazon.cloudwatchlogs.emf.serializers.UnitSerializer; +import software.amazon.cloudwatchlogs.emf.sinks.GroupedSinkShunt; +import software.amazon.cloudwatchlogs.emf.sinks.SinkShunt; + +public class MetricsLoggerThreadSafetyTest { + private volatile MetricsLogger logger; + private EnvironmentProvider envProvider; + private SinkShunt sink; + private Environment environment; + private volatile Throwable throwable; + + @Before + public void setUp() { + envProvider = mock(EnvironmentProvider.class); + environment = mock(Environment.class); + sink = new SinkShunt(); + throwable = null; + + when(envProvider.resolveEnvironment()) + .thenReturn(CompletableFuture.completedFuture(environment)); + when(environment.getSink()).thenReturn(sink); + logger = new MetricsLogger(envProvider); + } + + @Test + public void testConcurrentPutProperty() throws InterruptedException { + logger = new MetricsLogger(envProvider); + Thread[] threads = new Thread[100]; + for (int i = 0; i < 100; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + try { + logger.putProperty("Property-" + id, String.valueOf(id)); + } catch (Throwable e) { + throwable = e; // ensure no exceptions are thrown + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + logger.flush(); + for (int i = 0; i < 100; i++) { + Assert.assertEquals( + sink.getContext().getProperty("Property-" + i), String.valueOf(i)); + } + } + + @Test + public void testConcurrentPutDimension() throws InterruptedException { + logger = new MetricsLogger(envProvider); + Thread[] threads = new Thread[100]; + for (int i = 0; i < 100; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + try { + logger.putDimensions( + DimensionSet.of("Dim", String.valueOf(id))); + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + logger.flush(); + + List dimensions = sink.getContext().getDimensions(); + // check size + Assert.assertEquals(sink.getContext().getDimensions().size(), 100); + for (DimensionSet dim : dimensions) { + Assert.assertEquals( + dim.getDimensionKeys().size(), 4); // there are 3 default dimensions + } + // check content + Collections.sort( + dimensions, + Comparator.comparingInt(d -> Integer.parseInt(d.getDimensionValue("Dim")))); + for (int i = 0; i < 100; i++) { + Assert.assertEquals(dimensions.get(i).getDimensionValue("Dim"), String.valueOf(i)); + } + } + + @Test + public void testConcurrentPutDimensionAfterSetDimension() throws InterruptedException { + logger = new MetricsLogger(envProvider); + logger.setDimensions(DimensionSet.of("Dim", "0")); + + Thread[] threads = new Thread[99]; + for (int i = 0; i < 99; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + try { + logger.putDimensions( + DimensionSet.of("Dim", String.valueOf(id + 1))); + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + logger.flush(); + + List dimensions = sink.getContext().getDimensions(); + // check size + Assert.assertEquals(sink.getContext().getDimensions().size(), 100); + for (DimensionSet dim : dimensions) { + Assert.assertEquals( + dim.getDimensionKeys().size(), + 1); // there are no default dimensions after set + } + // check content + Collections.sort( + dimensions, + Comparator.comparingInt(d -> Integer.parseInt(d.getDimensionValue("Dim")))); + for (int i = 0; i < 100; i++) { + Assert.assertEquals(dimensions.get(i).getDimensionValue("Dim"), String.valueOf(i)); + } + } + + @Test + public void testConcurrentFlush() throws InterruptedException, JsonProcessingException { + GroupedSinkShunt groupedSink = new GroupedSinkShunt(); + when(envProvider.resolveEnvironment()) + .thenReturn(CompletableFuture.completedFuture(environment)); + when(environment.getSink()).thenReturn(groupedSink); + + logger = new MetricsLogger(envProvider); + Thread[] threads = new Thread[100]; + for (int i = 0; i < 100; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + try { + logger.putMetric("Metric-" + id, id); + logger.flush(); + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + ArrayList allMetrics = new ArrayList<>(); + for (List events : groupedSink.getLogEventList()) { + ArrayList metrics = parseMetrics(events); + allMetrics.addAll(metrics); + } + + assertEquals(allMetrics.size(), 100); + for (MetricDefinitionCopy metric : allMetrics) { + assertEquals(metric.getValues().size(), 1); + } + Collections.sort(allMetrics, Comparator.comparingDouble(m -> m.getValues().get(0))); + for (int i = 0; i < 100; i++) { + assertEquals(allMetrics.get(i).getName(), "Metric-" + i); + assertEquals(allMetrics.get(i).getValues().get(0), i, 1e-5); + } + } + + @Test + public void testConcurrentFlushAndPutMetric() + throws InterruptedException, JsonProcessingException { + GroupedSinkShunt groupedSink = new GroupedSinkShunt(); + when(envProvider.resolveEnvironment()) + .thenReturn(CompletableFuture.completedFuture(environment)); + when(environment.getSink()).thenReturn(groupedSink); + + logger = new MetricsLogger(envProvider); + Random rand = new Random(); + + Thread[] threads = new Thread[100]; + for (int i = 0; i < 100; i++) { + final int id = i; + int randTime = rand.nextInt(1000); + threads[i] = + new Thread( + () -> { + try { + // half threads do putMetric(), half do flush() + Thread.sleep(randTime); + if (id % 2 == 0) { + for (int j = id; j < id + 2; j++) { + logger.putMetric("Metric-" + j, j); + } + } else { + logger.flush(); + } + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + logger.flush(); + + ArrayList allMetrics = new ArrayList<>(); + for (List events : groupedSink.getLogEventList()) { + ArrayList metrics = parseMetrics(events); + allMetrics.addAll(metrics); + } + + assertEquals(allMetrics.size(), 100); + for (MetricDefinitionCopy metric : allMetrics) { + assertEquals(metric.getValues().size(), 1); + } + Collections.sort(allMetrics, Comparator.comparingDouble(m -> m.getValues().get(0))); + for (int i = 0; i < 100; i++) { + assertEquals(allMetrics.get(i).getName(), "Metric-" + i); + assertEquals(allMetrics.get(i).getValues().get(0), i, 1e-5); + } + } + + @Test + public void testConcurrentFlushAndMethodsOtherThanPutMetric() throws InterruptedException { + GroupedSinkShunt groupedSink = new GroupedSinkShunt(); + when(envProvider.resolveEnvironment()) + .thenReturn(CompletableFuture.completedFuture(environment)); + when(environment.getSink()).thenReturn(groupedSink); + + logger = new MetricsLogger(envProvider); + Random rand = new Random(); + + Thread[] threads = new Thread[100]; + for (int i = 0; i < 100; i++) { + final int id = i; + int randTime = rand.nextInt(1000); + threads[i] = + new Thread( + () -> { + try { + Thread.sleep(randTime); + if (id < 30) { + logger.putDimensions( + DimensionSet.of("Dim", String.valueOf(id))); + } else if (id < 40) { + logger.putProperty("Property-" + id, id); + } else { + logger.flush(); + } + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + logger.flush(); + + int contextNum = groupedSink.getContexts().size(); + MetricsContext finalContext = groupedSink.getContexts().get(contextNum - 1); + List dimensions = finalContext.getDimensions(); + // check size + assertEquals(dimensions.size(), 30); + for (DimensionSet dim : dimensions) { + Assert.assertEquals(dim.getDimensionKeys().size(), 4); // there are 3 default dimensions + } + // check content + Collections.sort( + dimensions, + Comparator.comparingInt(d -> Integer.parseInt(d.getDimensionValue("Dim")))); + for (int i = 0; i < 30; i++) { + Assert.assertEquals(dimensions.get(i).getDimensionValue("Dim"), String.valueOf(i)); + } + + int propertyCnt = 0; + for (MetricsContext mc : groupedSink.getContexts()) { + propertyCnt += mc.getProperty("Property-30") == null ? 0 : 1; + propertyCnt += mc.getProperty("Property-31") == null ? 0 : 1; + propertyCnt += mc.getProperty("Property-32") == null ? 0 : 1; + propertyCnt += mc.getProperty("Property-33") == null ? 0 : 1; + propertyCnt += mc.getProperty("Property-34") == null ? 0 : 1; + propertyCnt += mc.getProperty("Property-35") == null ? 0 : 1; + propertyCnt += mc.getProperty("Property-36") == null ? 0 : 1; + propertyCnt += mc.getProperty("Property-37") == null ? 0 : 1; + propertyCnt += mc.getProperty("Property-38") == null ? 0 : 1; + propertyCnt += mc.getProperty("Property-39") == null ? 0 : 1; + } + assertEquals(propertyCnt, 10); + } + + @After + public void tearDown() throws Throwable { + if (throwable != null) throw throwable; + } + + private Map parseRootNode(String event) throws JsonProcessingException { + return new JsonMapper().readValue(event, new TypeReference>() {}); + } + + @SuppressWarnings("unchecked") + // can not parse all metrics if metric number exceeds MAX_METRICS_PER_EVENT + private ArrayList parseMetrics(List events) + throws JsonProcessingException { + Map rootNode = parseRootNode(events.get(0)); + Map metadata = (Map) rootNode.get("_aws"); + + if (metadata == null) { + return new ArrayList<>(); + } + + ArrayList> metricDirectives = + (ArrayList>) metadata.get("CloudWatchMetrics"); + ArrayList> metrics = + (ArrayList>) metricDirectives.get(0).get("Metrics"); + + ArrayList metricDefinitions = new ArrayList<>(); + for (Map metric : metrics) { + String name = metric.get("Name"); + Unit unit = Unit.fromValue(metric.get("Unit")); + Object value = rootNode.get(name); + if (value instanceof ArrayList) { + metricDefinitions.add(new MetricDefinitionCopy(name, unit, (ArrayList) value)); + } else { + metricDefinitions.add(new MetricDefinitionCopy(name, unit, (double) value)); + } + } + return metricDefinitions; + } + + @AllArgsConstructor + private class MetricDefinitionCopy { + @NonNull + @Getter + @JsonProperty("Name") + private String name; + + @Getter + @JsonProperty("Unit") + @JsonSerialize(using = UnitSerializer.class) + @JsonDeserialize(using = UnitDeserializer.class) + private Unit unit; + + @JsonIgnore @NonNull @Getter private List values; + + MetricDefinitionCopy(String name) { + this(name, Unit.NONE, new ArrayList<>()); + } + + MetricDefinitionCopy(String name, double value) { + this(name, Unit.NONE, value); + } + + MetricDefinitionCopy(String name, Unit unit, double value) { + this(name, unit, new ArrayList<>(Arrays.asList(value))); + } + } +} diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java new file mode 100644 index 00000000..2106fcc9 --- /dev/null +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java @@ -0,0 +1,75 @@ +package software.amazon.cloudwatchlogs.emf.model; + +import static org.junit.Assert.assertEquals; + +import java.util.Collections; +import org.junit.After; +import org.junit.Test; + +public class MetricDirectiveThreadSafetyTest { + private volatile Throwable throwable = null; + + @Test + public void testConcurrentPutMetricWithDifferentKey() throws InterruptedException { + MetricDirective metricDirective = new MetricDirective(); + Thread[] threads = new Thread[100]; + for (int i = 0; i < 100; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + try { + metricDirective.putMetric("Metric-" + id, id); + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + assertEquals(metricDirective.getAllMetrics().size(), 100); + for (int i = 0; i < 100; i++) { + assertEquals( + metricDirective.getMetrics().get("Metric-" + i).getValues().get(0), i, 1e-5); + } + } + + @Test + public void testConcurrentPutMetricWithSameKey() throws InterruptedException { + MetricDirective metricDirective = new MetricDirective(); + Thread[] threads = new Thread[100]; + for (int i = 0; i < 100; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + try { + metricDirective.putMetric("Metric", id); + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + assertEquals(metricDirective.getAllMetrics().size(), 1); + MetricDefinition md = metricDirective.getAllMetrics().toArray(new MetricDefinition[0])[0]; + Collections.sort(md.getValues()); + for (int i = 0; i < 100; i++) { + assertEquals(md.getValues().get(i), i, 1e-5); + } + } + + @After + public void tearDown() throws Throwable { + if (throwable != null) throw throwable; + } +} diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextThreadSafetyTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextThreadSafetyTest.java new file mode 100644 index 00000000..3b84b52f --- /dev/null +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextThreadSafetyTest.java @@ -0,0 +1,45 @@ +package software.amazon.cloudwatchlogs.emf.model; + +import static org.junit.Assert.assertEquals; + +import java.util.Map; +import org.junit.After; +import org.junit.Test; + +public class MetricsContextThreadSafetyTest { + private volatile Throwable throwable = null; + + @Test + public void testConcurrentPutMetaData() throws InterruptedException { + MetricsContext mc = new MetricsContext(); + Thread[] threads = new Thread[100]; + for (int i = 0; i < 100; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + try { + mc.putMetadata("MetaData-" + id, String.valueOf(id)); + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + Map metaData = mc.getRootNode().getAws().getCustomMetadata(); + assertEquals(metaData.size(), 100); + for (int i = 0; i < 100; i++) { + assertEquals(metaData.get("MetaData-" + i), String.valueOf(i)); + } + } + + @After + public void tearDown() throws Throwable { + if (throwable != null) throw throwable; + } +} diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/GroupedSinkShunt.java b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/GroupedSinkShunt.java new file mode 100644 index 00000000..d39776e3 --- /dev/null +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/GroupedSinkShunt.java @@ -0,0 +1,57 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.cloudwatchlogs.emf.sinks; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import software.amazon.cloudwatchlogs.emf.model.MetricsContext; + +/** + * A mocked sink which can preserve all flushed log events. Useful for testing the result of + * concurrent flushing. + */ +public class GroupedSinkShunt implements ISink { + + private List contexts = new ArrayList<>(); + + private List> logEventList = new ArrayList<>(); + + @Override + public void accept(MetricsContext context) { + this.contexts.add(context); + try { + List logEvent = context.serialize(); + logEventList.add(logEvent); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public CompletableFuture shutdown() { + return CompletableFuture.completedFuture(null); + } + + public List getContexts() { + return contexts; + } + + public List> getLogEventList() { + return this.logEventList; + } +} From 379aaab19e32c36766f77516885340ccee1596ba Mon Sep 17 00:00:00 2001 From: Stephen-Bao Date: Mon, 18 Jul 2022 23:42:20 -0400 Subject: [PATCH 03/14] Added jmh benchmarking for ReadWriteLock & StampedLock --- .../emf/MetricsLoggerBenchmark.java | 337 ++++++++++++++++++ 1 file changed, 337 insertions(+) create mode 100644 src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java diff --git a/src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java b/src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java new file mode 100644 index 00000000..60b140aa --- /dev/null +++ b/src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java @@ -0,0 +1,337 @@ +package software.amazon.cloudwatchlogs.emf; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.*; +import software.amazon.cloudwatchlogs.emf.environment.Environment; +import software.amazon.cloudwatchlogs.emf.environment.EnvironmentProvider; +import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger; +import software.amazon.cloudwatchlogs.emf.model.DimensionSet; +import software.amazon.cloudwatchlogs.emf.sinks.SinkShunt; + +@State(Scope.Benchmark) +@BenchmarkMode(Mode.AverageTime) +@Warmup(iterations = 3, time = 5) +@Measurement(iterations = 3, time = 5) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(value = 1) +public class MetricsLoggerBenchmark { + private MetricsLogger logger; + private EnvironmentProvider envProvider; + private SinkShunt sink; + private Environment environment; + + @Setup + public void setUp() { + envProvider = mock(EnvironmentProvider.class); + environment = mock(Environment.class); + sink = new SinkShunt(); + + when(envProvider.resolveEnvironment()) + .thenReturn(CompletableFuture.completedFuture(environment)); + when(environment.getSink()).thenReturn(sink); + logger = new MetricsLogger(envProvider); + } + + /** + * Publishing 10000 metrics with single thread. + * no lock: 0.844 ms/op; RW lock: 0.896 ms/op; S lock: 0.884 ms/op + */ + @Benchmark + public void measurePutMetric() { + logger = new MetricsLogger(envProvider); // 0.024 ms/op + + // should make this op dominate running time + for (int i = 0; i < 10000; i++) { + logger.putMetric("Metric-" + i, i); + } + } + + /** + * Flush with single thread. + * no lock: 0.148 ms/op; RW lock: 0.148 ms/op; S lock: 0.147 ms/op + */ + @Benchmark + public void measureFlush() { + logger = new MetricsLogger(envProvider); + + logger.flush(); + + sink.shutdown(); + } + + /** + * Invoke all methods 100 times with single thread. + * no lock: 6.946 ms/op; RW lock: 6.988 ms/op; S lock: 6.823 ms/op + */ + @Benchmark + public void measureAllMethods() { + logger = new MetricsLogger(envProvider); + + for (int j = 0; j < 100; j++) { + logger.putMetadata("MetaData-" + j, j); + logger.putProperty("Property-" + j, j); + logger.putDimensions(DimensionSet.of("Dim-" + j, String.valueOf(j))); + logger.putMetric("Metric-" + j, j); + logger.flush(); + } + + sink.shutdown(); + } + + /** + * Publish 10000 metrics with 5 threads. + * no lock: 0.758 ms/op; RW lock: 2.816 ms/op; S lock: 2.247 ms/op + * @throws InterruptedException + */ + @Benchmark + public void measurePutMetricWith5Threads() throws InterruptedException { + measurePutMetricWithNThreads(5); + } + + /** + * Publish 10000 metrics with 10 threads. + * no lock: 0.949 ms/op; RW lock: 3.823 ms/op; S lock: 3.078 ms/op + * @throws InterruptedException + */ + @Benchmark + public void measurePutMetricWith10Threads() throws InterruptedException { + measurePutMetricWithNThreads(10); + } + + /** + * Publish 10000 metrics with 20 threads. + * no lock: 1.610 ms/op; RW lock: 3.349 ms/op; S lock: 2.644 ms/op + * @throws InterruptedException + */ + @Benchmark + public void measurePutMetricWith20Threads() throws InterruptedException { + measurePutMetricWithNThreads(20); + } + + /** + * Publish 10000 metrics with 50 threads. + * no lock: 4.161 ms/op; RW lock: 4.107 ms/op; S lock: 4.184 ms/op + * @throws InterruptedException + */ + @Benchmark + public void measurePutMetricWith50Threads() throws InterruptedException { + measurePutMetricWithNThreads(50); + } + + /** + * Publish 10000 metrics with 100 threads. + * no lock: 8.648 ms/op; RW lock: 9.071 ms/op; S lock: 8.576 ms/op + * @throws InterruptedException + */ + @Benchmark + public void measurePutMetricWith100Threads() throws InterruptedException { + measurePutMetricWithNThreads(100); + } + + /** + * Flush 1000 times with 5 threads. + * no lock: 7.529 ms/op; RW lock: 22.742 ms/op; S lock: 23.304 ms/op + * @throws InterruptedException + */ + @Benchmark + public void measureFlushWith5Threads() throws InterruptedException { + measureFlushWithNThreads(5); + } + + /** + * Flush 1000 times with 10 threads. + * no lock: 12.900 ms/op; RW lock: 25.015 ms/op; S lock: 24.778 ms/op + * @throws InterruptedException + */ + @Benchmark + public void measureFlushWith10Threads() throws InterruptedException { + measureFlushWithNThreads(10); + } + + /** + * Flush 1000 times with 20 threads. + * no lock: 6.537 ms/op; RW lock: 25.705 ms/op; S lock: 26.465 ms/op + * @throws InterruptedException + */ + @Benchmark + public void measureFlushWith20Threads() throws InterruptedException { + measureFlushWithNThreads(20); + } + + /** + * Flush 1000 times with 50 threads. + * no lock: 24.985 ms/op; RW lock: 31.453 ms/op; S lock: 31.965 ms/op + * @throws InterruptedException + */ + @Benchmark + public void measureFlushWith50Threads() throws InterruptedException { + measureFlushWithNThreads(50); + } + + /** + * Flush 1000 times with 100 threads. + * no lock: 34.527 ms/op; RW lock: 39.606 ms/op; S lock: 40.007 ms/op + * @throws InterruptedException + */ + @Benchmark + public void measureFlushWith100Threads() throws InterruptedException { + measureFlushWithNThreads(100); + } + + /** + * Flush 1000 times with 1000 threads. + * no lock: 116.047 ms/op; RW lock: 141.227 ms/op; S lock: 141.597 ms/op + * @throws InterruptedException + */ + @Benchmark + @Warmup(time = 10) + @Measurement(time = 10) + public void measureFlushWith1000Threads() throws InterruptedException { + measureFlushWithNThreads(1000); + } + + /** + * Execute all methods for 1000 times with 5 threads. + * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): + * 84.041 ± 24.965 ms/op; + * RW lock: 264.439 ± 8.070 ms/op; S lock: 264.630 ± 24.252 ms/op + * @throws InterruptedException + */ + @Benchmark + public void measureAllMethodsWith5Threads() throws InterruptedException { + measureAllMethodsWithNThreads(5); + } + + /** + * Execute all methods for 1000 times with 10 threads. + * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): + * 41.174 ± 6.084 ms/op; + * RW lock: 263.103 ± 15.141 ms/op; S lock: 256.267 ± 30.922 ms/op + * @throws InterruptedException + */ + @Benchmark + public void measureAllMethodsWith10Threads() throws InterruptedException { + measureAllMethodsWithNThreads(10); + } + + /** + * Execute all methods for 1000 times with 100 threads. + * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): + * 35.779 ± 2.414 ms/op; + * RW lock: 315.340 ± 16.074 ms/op; S lock: 288.459 ± 5.801 ms/op + * @throws InterruptedException + */ + @Benchmark + @Warmup(time = 10) + @Measurement(time = 10) + public void measureAllMethodsWith100Threads() throws InterruptedException { + measureAllMethodsWithNThreads(100); + } + + /** + * Execute all methods for 1000 times with 500 threads. + * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): + * 81.785 ± 11.616 ms/op; + * RW lock: 346.697 ± 51.133 ms/op; S lock: 368.981 ± 161.049 ms/op + * @throws InterruptedException + */ + @Benchmark + @Warmup(time = 10) + @Measurement(time = 10) + public void measureAllMethodsWith500Threads() throws InterruptedException { + measureAllMethodsWithNThreads(500); + } + + /** + * Execute all methods for 1000 times with 1000 threads. + * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): + * 218.505 ± 178.808 ms/op; + * RW lock: 436.380 ± 317.130 ms/op; S lock: 390.074 ± 100 ms/op + * @throws InterruptedException + */ + @Benchmark + @Warmup(time = 10) + @Measurement(time = 10) + public void measureAllMethodsWith1000Threads() throws InterruptedException { + measureAllMethodsWithNThreads(1000); + } + + private void measurePutMetricWithNThreads(int n) throws InterruptedException { + logger = new MetricsLogger(envProvider); + Thread[] threads = new Thread[n]; + + for (int i = 0; i < n; i++) { + final int id = i; + int batchSize = 10000 / n; + threads[i] = + new Thread( + () -> { + for (int j = batchSize * id; j < batchSize * id + batchSize; j++) { + logger.putMetric("Metric-" + j, j); + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + } + + private void measureFlushWithNThreads(int n) throws InterruptedException { + logger = new MetricsLogger(envProvider); + Thread[] threads = new Thread[n]; + + for (int i = 0; i < n; i++) { + final int id = i; + int batchSize = 1000 / n; + threads[i] = + new Thread( + () -> { + for (int j = batchSize * id; j < batchSize * id + batchSize; j++) { + logger.flush(); + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + sink.shutdown(); + } + + private void measureAllMethodsWithNThreads(int n) throws InterruptedException { + logger = new MetricsLogger(envProvider); + Thread[] threads = new Thread[n]; + + for (int i = 0; i < n; i++) { + final int id = i; + int batchSize = 1000 / n; + threads[i] = + new Thread( + () -> { + for (int j = batchSize * id; j < batchSize * id + batchSize; j++) { + logger.putMetadata("MetaData-" + id, id); + logger.putProperty("Property-" + id, id); + logger.putDimensions( + DimensionSet.of("Dim-" + id, String.valueOf(id))); + logger.putMetric("Metric-" + j, j); + logger.flush(); + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + sink.shutdown(); + } +} From 19a1ef64494ea42612a03ae3037488b6cb3a5875 Mon Sep 17 00:00:00 2001 From: Stephen-Bao Date: Tue, 19 Jul 2022 16:29:23 -0400 Subject: [PATCH 04/14] Adjusted the code format --- .../emf/MetricsLoggerBenchmark.java | 108 ++++++++++-------- .../logger/MetricsLoggerThreadSafetyTest.java | 9 +- 2 files changed, 61 insertions(+), 56 deletions(-) diff --git a/src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java b/src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java index 60b140aa..775d999e 100644 --- a/src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java +++ b/src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java @@ -37,8 +37,8 @@ public void setUp() { } /** - * Publishing 10000 metrics with single thread. - * no lock: 0.844 ms/op; RW lock: 0.896 ms/op; S lock: 0.884 ms/op + * Publishing 10000 metrics with single thread. no lock: 0.844 ms/op; RW lock: 0.896 ms/op; S + * lock: 0.884 ms/op */ @Benchmark public void measurePutMetric() { @@ -50,10 +50,7 @@ public void measurePutMetric() { } } - /** - * Flush with single thread. - * no lock: 0.148 ms/op; RW lock: 0.148 ms/op; S lock: 0.147 ms/op - */ + /** Flush with single thread. no lock: 0.148 ms/op; RW lock: 0.148 ms/op; S lock: 0.147 ms/op */ @Benchmark public void measureFlush() { logger = new MetricsLogger(envProvider); @@ -64,8 +61,8 @@ public void measureFlush() { } /** - * Invoke all methods 100 times with single thread. - * no lock: 6.946 ms/op; RW lock: 6.988 ms/op; S lock: 6.823 ms/op + * Invoke all methods 100 times with single thread. no lock: 6.946 ms/op; RW lock: 6.988 ms/op; + * S lock: 6.823 ms/op */ @Benchmark public void measureAllMethods() { @@ -83,8 +80,9 @@ public void measureAllMethods() { } /** - * Publish 10000 metrics with 5 threads. - * no lock: 0.758 ms/op; RW lock: 2.816 ms/op; S lock: 2.247 ms/op + * Publish 10000 metrics with 5 threads. no lock: 0.758 ms/op; RW lock: 2.816 ms/op; S lock: + * 2.247 ms/op + * * @throws InterruptedException */ @Benchmark @@ -93,8 +91,9 @@ public void measurePutMetricWith5Threads() throws InterruptedException { } /** - * Publish 10000 metrics with 10 threads. - * no lock: 0.949 ms/op; RW lock: 3.823 ms/op; S lock: 3.078 ms/op + * Publish 10000 metrics with 10 threads. no lock: 0.949 ms/op; RW lock: 3.823 ms/op; S lock: + * 3.078 ms/op + * * @throws InterruptedException */ @Benchmark @@ -103,8 +102,9 @@ public void measurePutMetricWith10Threads() throws InterruptedException { } /** - * Publish 10000 metrics with 20 threads. - * no lock: 1.610 ms/op; RW lock: 3.349 ms/op; S lock: 2.644 ms/op + * Publish 10000 metrics with 20 threads. no lock: 1.610 ms/op; RW lock: 3.349 ms/op; S lock: + * 2.644 ms/op + * * @throws InterruptedException */ @Benchmark @@ -113,8 +113,9 @@ public void measurePutMetricWith20Threads() throws InterruptedException { } /** - * Publish 10000 metrics with 50 threads. - * no lock: 4.161 ms/op; RW lock: 4.107 ms/op; S lock: 4.184 ms/op + * Publish 10000 metrics with 50 threads. no lock: 4.161 ms/op; RW lock: 4.107 ms/op; S lock: + * 4.184 ms/op + * * @throws InterruptedException */ @Benchmark @@ -123,8 +124,9 @@ public void measurePutMetricWith50Threads() throws InterruptedException { } /** - * Publish 10000 metrics with 100 threads. - * no lock: 8.648 ms/op; RW lock: 9.071 ms/op; S lock: 8.576 ms/op + * Publish 10000 metrics with 100 threads. no lock: 8.648 ms/op; RW lock: 9.071 ms/op; S lock: + * 8.576 ms/op + * * @throws InterruptedException */ @Benchmark @@ -133,8 +135,9 @@ public void measurePutMetricWith100Threads() throws InterruptedException { } /** - * Flush 1000 times with 5 threads. - * no lock: 7.529 ms/op; RW lock: 22.742 ms/op; S lock: 23.304 ms/op + * Flush 1000 times with 5 threads. no lock: 7.529 ms/op; RW lock: 22.742 ms/op; S lock: 23.304 + * ms/op + * * @throws InterruptedException */ @Benchmark @@ -143,8 +146,9 @@ public void measureFlushWith5Threads() throws InterruptedException { } /** - * Flush 1000 times with 10 threads. - * no lock: 12.900 ms/op; RW lock: 25.015 ms/op; S lock: 24.778 ms/op + * Flush 1000 times with 10 threads. no lock: 12.900 ms/op; RW lock: 25.015 ms/op; S lock: + * 24.778 ms/op + * * @throws InterruptedException */ @Benchmark @@ -153,8 +157,9 @@ public void measureFlushWith10Threads() throws InterruptedException { } /** - * Flush 1000 times with 20 threads. - * no lock: 6.537 ms/op; RW lock: 25.705 ms/op; S lock: 26.465 ms/op + * Flush 1000 times with 20 threads. no lock: 6.537 ms/op; RW lock: 25.705 ms/op; S lock: 26.465 + * ms/op + * * @throws InterruptedException */ @Benchmark @@ -163,8 +168,9 @@ public void measureFlushWith20Threads() throws InterruptedException { } /** - * Flush 1000 times with 50 threads. - * no lock: 24.985 ms/op; RW lock: 31.453 ms/op; S lock: 31.965 ms/op + * Flush 1000 times with 50 threads. no lock: 24.985 ms/op; RW lock: 31.453 ms/op; S lock: + * 31.965 ms/op + * * @throws InterruptedException */ @Benchmark @@ -173,8 +179,9 @@ public void measureFlushWith50Threads() throws InterruptedException { } /** - * Flush 1000 times with 100 threads. - * no lock: 34.527 ms/op; RW lock: 39.606 ms/op; S lock: 40.007 ms/op + * Flush 1000 times with 100 threads. no lock: 34.527 ms/op; RW lock: 39.606 ms/op; S lock: + * 40.007 ms/op + * * @throws InterruptedException */ @Benchmark @@ -183,8 +190,9 @@ public void measureFlushWith100Threads() throws InterruptedException { } /** - * Flush 1000 times with 1000 threads. - * no lock: 116.047 ms/op; RW lock: 141.227 ms/op; S lock: 141.597 ms/op + * Flush 1000 times with 1000 threads. no lock: 116.047 ms/op; RW lock: 141.227 ms/op; S lock: + * 141.597 ms/op + * * @throws InterruptedException */ @Benchmark @@ -195,10 +203,10 @@ public void measureFlushWith1000Threads() throws InterruptedException { } /** - * Execute all methods for 1000 times with 5 threads. - * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): - * 84.041 ± 24.965 ms/op; - * RW lock: 264.439 ± 8.070 ms/op; S lock: 264.630 ± 24.252 ms/op + * Execute all methods for 1000 times with 5 threads. no lock (need to sync getAllDimensions() & + * getAllDimensionKeys() in MetricsDirective): 84.041 ± 24.965 ms/op; RW lock: 264.439 ± 8.070 + * ms/op; S lock: 264.630 ± 24.252 ms/op + * * @throws InterruptedException */ @Benchmark @@ -207,10 +215,10 @@ public void measureAllMethodsWith5Threads() throws InterruptedException { } /** - * Execute all methods for 1000 times with 10 threads. - * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): - * 41.174 ± 6.084 ms/op; - * RW lock: 263.103 ± 15.141 ms/op; S lock: 256.267 ± 30.922 ms/op + * Execute all methods for 1000 times with 10 threads. no lock (need to sync getAllDimensions() + * & getAllDimensionKeys() in MetricsDirective): 41.174 ± 6.084 ms/op; RW lock: 263.103 ± 15.141 + * ms/op; S lock: 256.267 ± 30.922 ms/op + * * @throws InterruptedException */ @Benchmark @@ -219,10 +227,10 @@ public void measureAllMethodsWith10Threads() throws InterruptedException { } /** - * Execute all methods for 1000 times with 100 threads. - * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): - * 35.779 ± 2.414 ms/op; - * RW lock: 315.340 ± 16.074 ms/op; S lock: 288.459 ± 5.801 ms/op + * Execute all methods for 1000 times with 100 threads. no lock (need to sync getAllDimensions() + * & getAllDimensionKeys() in MetricsDirective): 35.779 ± 2.414 ms/op; RW lock: 315.340 ± 16.074 + * ms/op; S lock: 288.459 ± 5.801 ms/op + * * @throws InterruptedException */ @Benchmark @@ -233,10 +241,10 @@ public void measureAllMethodsWith100Threads() throws InterruptedException { } /** - * Execute all methods for 1000 times with 500 threads. - * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): - * 81.785 ± 11.616 ms/op; - * RW lock: 346.697 ± 51.133 ms/op; S lock: 368.981 ± 161.049 ms/op + * Execute all methods for 1000 times with 500 threads. no lock (need to sync getAllDimensions() + * & getAllDimensionKeys() in MetricsDirective): 81.785 ± 11.616 ms/op; RW lock: 346.697 ± + * 51.133 ms/op; S lock: 368.981 ± 161.049 ms/op + * * @throws InterruptedException */ @Benchmark @@ -247,10 +255,10 @@ public void measureAllMethodsWith500Threads() throws InterruptedException { } /** - * Execute all methods for 1000 times with 1000 threads. - * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): - * 218.505 ± 178.808 ms/op; - * RW lock: 436.380 ± 317.130 ms/op; S lock: 390.074 ± 100 ms/op + * Execute all methods for 1000 times with 1000 threads. no lock (need to sync + * getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 218.505 ± 178.808 ms/op; RW + * lock: 436.380 ± 317.130 ms/op; S lock: 390.074 ± 100 ms/op + * * @throws InterruptedException */ @Benchmark diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java index 69c87e45..1db0da68 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java @@ -74,8 +74,7 @@ public void testConcurrentPutProperty() throws InterruptedException { logger.flush(); for (int i = 0; i < 100; i++) { - Assert.assertEquals( - sink.getContext().getProperty("Property-" + i), String.valueOf(i)); + Assert.assertEquals(sink.getContext().getProperty("Property-" + i), String.valueOf(i)); } } @@ -108,8 +107,7 @@ public void testConcurrentPutDimension() throws InterruptedException { // check size Assert.assertEquals(sink.getContext().getDimensions().size(), 100); for (DimensionSet dim : dimensions) { - Assert.assertEquals( - dim.getDimensionKeys().size(), 4); // there are 3 default dimensions + Assert.assertEquals(dim.getDimensionKeys().size(), 4); // there are 3 default dimensions } // check content Collections.sort( @@ -152,8 +150,7 @@ public void testConcurrentPutDimensionAfterSetDimension() throws InterruptedExce Assert.assertEquals(sink.getContext().getDimensions().size(), 100); for (DimensionSet dim : dimensions) { Assert.assertEquals( - dim.getDimensionKeys().size(), - 1); // there are no default dimensions after set + dim.getDimensionKeys().size(), 1); // there are no default dimensions after set } // check content Collections.sort( From 1b6c823716f196e6ac1eb7feacc0e5bbb1eb2b73 Mon Sep 17 00:00:00 2001 From: Stephen-Bao Date: Tue, 26 Jul 2022 23:09:26 -0400 Subject: [PATCH 05/14] Made some changes to test cases to introduce more concurrency --- .../logger/MetricsLoggerThreadSafetyTest.java | 55 +++++++++++++------ .../MetricDirectiveThreadSafetyTest.java | 22 ++++++-- .../model/MetricsContextThreadSafetyTest.java | 14 +++-- 3 files changed, 64 insertions(+), 27 deletions(-) diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java index 1db0da68..5db095b6 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java @@ -54,13 +54,19 @@ public void setUp() { public void testConcurrentPutProperty() throws InterruptedException { logger = new MetricsLogger(envProvider); Thread[] threads = new Thread[100]; + long targetTimestampToRun = System.currentTimeMillis() + 500; + for (int i = 0; i < 100; i++) { final int id = i; threads[i] = new Thread( () -> { try { - logger.putProperty("Property-" + id, String.valueOf(id)); + Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); + for (int j = 0; j < 1000; j++) { + int propertyId = 1000 * id + j; + logger.putProperty("Property-" + propertyId, String.valueOf(propertyId)); + } } catch (Throwable e) { throwable = e; // ensure no exceptions are thrown } @@ -73,7 +79,7 @@ public void testConcurrentPutProperty() throws InterruptedException { } logger.flush(); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 100000; i++) { Assert.assertEquals(sink.getContext().getProperty("Property-" + i), String.valueOf(i)); } } @@ -82,14 +88,19 @@ public void testConcurrentPutProperty() throws InterruptedException { public void testConcurrentPutDimension() throws InterruptedException { logger = new MetricsLogger(envProvider); Thread[] threads = new Thread[100]; + long targetTimestampToRun = System.currentTimeMillis() + 500; + for (int i = 0; i < 100; i++) { final int id = i; threads[i] = new Thread( () -> { try { - logger.putDimensions( - DimensionSet.of("Dim", String.valueOf(id))); + Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); + for (int j = 0; j < 1000; j++) { + int dimensionId = 1000 * id + j; + logger.putDimensions(DimensionSet.of("Dim", String.valueOf(dimensionId))); + } } catch (Throwable e) { throwable = e; } @@ -105,7 +116,7 @@ public void testConcurrentPutDimension() throws InterruptedException { List dimensions = sink.getContext().getDimensions(); // check size - Assert.assertEquals(sink.getContext().getDimensions().size(), 100); + Assert.assertEquals(sink.getContext().getDimensions().size(), 100000); for (DimensionSet dim : dimensions) { Assert.assertEquals(dim.getDimensionKeys().size(), 4); // there are 3 default dimensions } @@ -113,7 +124,7 @@ public void testConcurrentPutDimension() throws InterruptedException { Collections.sort( dimensions, Comparator.comparingInt(d -> Integer.parseInt(d.getDimensionValue("Dim")))); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 100000; i++) { Assert.assertEquals(dimensions.get(i).getDimensionValue("Dim"), String.valueOf(i)); } } @@ -122,16 +133,20 @@ public void testConcurrentPutDimension() throws InterruptedException { public void testConcurrentPutDimensionAfterSetDimension() throws InterruptedException { logger = new MetricsLogger(envProvider); logger.setDimensions(DimensionSet.of("Dim", "0")); + long targetTimestampToRun = System.currentTimeMillis() + 500; - Thread[] threads = new Thread[99]; - for (int i = 0; i < 99; i++) { + Thread[] threads = new Thread[100]; + for (int i = 0; i < 100; i++) { final int id = i; threads[i] = new Thread( () -> { try { - logger.putDimensions( - DimensionSet.of("Dim", String.valueOf(id + 1))); + Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); + for (int j = 0; j < 1000; j++) { + int dimensionId = 1000 * id + j + 1; + logger.putDimensions(DimensionSet.of("Dim", String.valueOf(dimensionId))); + } } catch (Throwable e) { throwable = e; } @@ -147,7 +162,7 @@ public void testConcurrentPutDimensionAfterSetDimension() throws InterruptedExce List dimensions = sink.getContext().getDimensions(); // check size - Assert.assertEquals(sink.getContext().getDimensions().size(), 100); + Assert.assertEquals(sink.getContext().getDimensions().size(), 100001); for (DimensionSet dim : dimensions) { Assert.assertEquals( dim.getDimensionKeys().size(), 1); // there are no default dimensions after set @@ -156,7 +171,7 @@ public void testConcurrentPutDimensionAfterSetDimension() throws InterruptedExce Collections.sort( dimensions, Comparator.comparingInt(d -> Integer.parseInt(d.getDimensionValue("Dim")))); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 100001; i++) { Assert.assertEquals(dimensions.get(i).getDimensionValue("Dim"), String.valueOf(i)); } } @@ -170,12 +185,15 @@ public void testConcurrentFlush() throws InterruptedException, JsonProcessingExc logger = new MetricsLogger(envProvider); Thread[] threads = new Thread[100]; + long targetTimestampToRun = System.currentTimeMillis() + 500; + for (int i = 0; i < 100; i++) { final int id = i; threads[i] = new Thread( () -> { try { + Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); logger.putMetric("Metric-" + id, id); logger.flush(); } catch (Throwable e) { @@ -215,18 +233,17 @@ public void testConcurrentFlushAndPutMetric() when(environment.getSink()).thenReturn(groupedSink); logger = new MetricsLogger(envProvider); - Random rand = new Random(); + long targetTimestampToRun = System.currentTimeMillis() + 500; Thread[] threads = new Thread[100]; for (int i = 0; i < 100; i++) { final int id = i; - int randTime = rand.nextInt(1000); threads[i] = new Thread( () -> { try { // half threads do putMetric(), half do flush() - Thread.sleep(randTime); + Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); if (id % 2 == 0) { for (int j = id; j < id + 2; j++) { logger.putMetric("Metric-" + j, j); @@ -271,17 +288,19 @@ public void testConcurrentFlushAndMethodsOtherThanPutMetric() throws Interrupted when(environment.getSink()).thenReturn(groupedSink); logger = new MetricsLogger(envProvider); - Random rand = new Random(); +// Random rand = new Random(); + long targetTimestampToRun = System.currentTimeMillis() + 500; Thread[] threads = new Thread[100]; for (int i = 0; i < 100; i++) { final int id = i; - int randTime = rand.nextInt(1000); +// int randTime = rand.nextInt(1000); threads[i] = new Thread( () -> { try { - Thread.sleep(randTime); +// Thread.sleep(randTime); + Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); if (id < 30) { logger.putDimensions( DimensionSet.of("Dim", String.valueOf(id))); diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java index 2106fcc9..e644d27f 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java @@ -13,13 +13,19 @@ public class MetricDirectiveThreadSafetyTest { public void testConcurrentPutMetricWithDifferentKey() throws InterruptedException { MetricDirective metricDirective = new MetricDirective(); Thread[] threads = new Thread[100]; + long targetTimestampToRun = System.currentTimeMillis() + 500; // all threads should target running on this timestamp + for (int i = 0; i < 100; i++) { final int id = i; threads[i] = new Thread( () -> { try { - metricDirective.putMetric("Metric-" + id, id); + Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); // try to make all threads run at same time + for (int j = 0; j < 1000; j++) { + int metricId = 1000 * id + j; + metricDirective.putMetric("Metric-" + metricId, metricId); + } } catch (Throwable e) { throwable = e; } @@ -31,8 +37,8 @@ public void testConcurrentPutMetricWithDifferentKey() throws InterruptedExceptio t.join(); } - assertEquals(metricDirective.getAllMetrics().size(), 100); - for (int i = 0; i < 100; i++) { + assertEquals(metricDirective.getAllMetrics().size(), 100000); + for (int i = 0; i < 100000; i++) { assertEquals( metricDirective.getMetrics().get("Metric-" + i).getValues().get(0), i, 1e-5); } @@ -42,13 +48,19 @@ public void testConcurrentPutMetricWithDifferentKey() throws InterruptedExceptio public void testConcurrentPutMetricWithSameKey() throws InterruptedException { MetricDirective metricDirective = new MetricDirective(); Thread[] threads = new Thread[100]; + long targetTimestampToRun = System.currentTimeMillis() + 500; + for (int i = 0; i < 100; i++) { final int id = i; threads[i] = new Thread( () -> { try { - metricDirective.putMetric("Metric", id); + Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); + for (int j = 0; j < 1000; j++) { + int metricId = 1000 * id + j; + metricDirective.putMetric("Metric", metricId); + } } catch (Throwable e) { throwable = e; } @@ -63,7 +75,7 @@ public void testConcurrentPutMetricWithSameKey() throws InterruptedException { assertEquals(metricDirective.getAllMetrics().size(), 1); MetricDefinition md = metricDirective.getAllMetrics().toArray(new MetricDefinition[0])[0]; Collections.sort(md.getValues()); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 100000; i++) { assertEquals(md.getValues().get(i), i, 1e-5); } } diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextThreadSafetyTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextThreadSafetyTest.java index 3b84b52f..c2810d9e 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextThreadSafetyTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextThreadSafetyTest.java @@ -13,13 +13,19 @@ public class MetricsContextThreadSafetyTest { public void testConcurrentPutMetaData() throws InterruptedException { MetricsContext mc = new MetricsContext(); Thread[] threads = new Thread[100]; + long targetTimestampToRun = System.currentTimeMillis() + 500; + for (int i = 0; i < 100; i++) { final int id = i; threads[i] = new Thread( () -> { try { - mc.putMetadata("MetaData-" + id, String.valueOf(id)); + Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); + for (int j = 0; j < 1000; j++) { + int metaDataId = 1000 * id + j; + mc.putMetadata("MetaData-" + metaDataId, metaDataId); + } } catch (Throwable e) { throwable = e; } @@ -32,9 +38,9 @@ public void testConcurrentPutMetaData() throws InterruptedException { } Map metaData = mc.getRootNode().getAws().getCustomMetadata(); - assertEquals(metaData.size(), 100); - for (int i = 0; i < 100; i++) { - assertEquals(metaData.get("MetaData-" + i), String.valueOf(i)); + assertEquals(metaData.size(), 100000); + for (int i = 0; i < 100000; i++) { + assertEquals(metaData.get("MetaData-" + i), i); } } From 5b593a7ae06da36dad06a65f8074ffd295333921 Mon Sep 17 00:00:00 2001 From: Stephen-Bao Date: Thu, 28 Jul 2022 10:41:49 -0400 Subject: [PATCH 06/14] Finished concurrency test case revision --- .../emf/logger/MetricsLogger.java | 46 +++--- .../logger/MetricsLoggerThreadSafetyTest.java | 154 ++++++++++-------- .../MetricDirectiveThreadSafetyTest.java | 1 + .../model/MetricsContextThreadSafetyTest.java | 1 + 4 files changed, 113 insertions(+), 89 deletions(-) diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java b/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java index 9e8e3d95..cfcdbab1 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java @@ -19,6 +19,8 @@ import java.time.Instant; import java.util.concurrent.CompletableFuture; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + import lombok.extern.slf4j.Slf4j; import software.amazon.cloudwatchlogs.emf.environment.Environment; import software.amazon.cloudwatchlogs.emf.environment.EnvironmentProvider; @@ -91,13 +93,10 @@ public void flush() { * @return the current logger */ public MetricsLogger putProperty(String key, Object value) { - rwl.readLock().lock(); - try { + return applyReadLock(() -> { this.context.putProperty(key, value); return this; - } finally { - rwl.readLock().unlock(); - } + }); } /** @@ -112,13 +111,10 @@ public MetricsLogger putProperty(String key, Object value) { * @return the current logger */ public MetricsLogger putDimensions(DimensionSet dimensions) { - rwl.readLock().lock(); - try { + return applyReadLock(() -> { context.putDimension(dimensions); return this; - } finally { - rwl.readLock().unlock(); - } + }); } /** @@ -131,13 +127,10 @@ public MetricsLogger putDimensions(DimensionSet dimensions) { * @return the current logger */ public MetricsLogger setDimensions(DimensionSet... dimensionSets) { - rwl.readLock().lock(); - try { + return applyReadLock(() -> { context.setDimensions(dimensionSets); return this; - } finally { - rwl.readLock().unlock(); - } + }); } /** @@ -151,13 +144,10 @@ public MetricsLogger setDimensions(DimensionSet... dimensionSets) { * @return the current logger */ public MetricsLogger putMetric(String key, double value, Unit unit) { - rwl.readLock().lock(); - try { + return applyReadLock(() -> { this.context.putMetric(key, value, unit); return this; - } finally { - rwl.readLock().unlock(); - } + }); } /** @@ -185,13 +175,10 @@ public MetricsLogger putMetric(String key, double value) { * @return the current logger */ public MetricsLogger putMetadata(String key, Object value) { - rwl.readLock().lock(); - try { + return applyReadLock(() -> { this.context.putMetadata(key, value); return this; - } finally { - rwl.readLock().unlock(); - } + }); } /** @@ -227,4 +214,13 @@ private void configureContextForEnvironment(MetricsContext context, Environment context.setDefaultDimensions(defaultDimension); environment.configureContext(context); } + + private MetricsLogger applyReadLock(Supplier any) { + rwl.readLock().lock(); + try { + return any.get(); + } finally { + rwl.readLock().unlock(); + } + } } diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java index 5db095b6..aa3056c7 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java @@ -35,14 +35,13 @@ public class MetricsLoggerThreadSafetyTest { private EnvironmentProvider envProvider; private SinkShunt sink; private Environment environment; - private volatile Throwable throwable; + private volatile Throwable throwable = null; @Before public void setUp() { envProvider = mock(EnvironmentProvider.class); environment = mock(Environment.class); sink = new SinkShunt(); - throwable = null; when(envProvider.resolveEnvironment()) .thenReturn(CompletableFuture.completedFuture(environment)); @@ -184,15 +183,16 @@ public void testConcurrentFlush() throws InterruptedException, JsonProcessingExc when(environment.getSink()).thenReturn(groupedSink); logger = new MetricsLogger(envProvider); - Thread[] threads = new Thread[100]; - long targetTimestampToRun = System.currentTimeMillis() + 500; + Thread[] threads = new Thread[300]; + long targetTimestampToRun = System.currentTimeMillis() + 1000; - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 300; i++) { final int id = i; threads[i] = new Thread( () -> { try { + // try to putMetric() and flush() at the same time Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); logger.putMetric("Metric-" + id, id); logger.flush(); @@ -209,16 +209,16 @@ public void testConcurrentFlush() throws InterruptedException, JsonProcessingExc ArrayList allMetrics = new ArrayList<>(); for (List events : groupedSink.getLogEventList()) { - ArrayList metrics = parseMetrics(events); + ArrayList metrics = parseAllMetrics(events); allMetrics.addAll(metrics); } - assertEquals(allMetrics.size(), 100); + assertEquals(allMetrics.size(), 300); for (MetricDefinitionCopy metric : allMetrics) { assertEquals(metric.getValues().size(), 1); } Collections.sort(allMetrics, Comparator.comparingDouble(m -> m.getValues().get(0))); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 300; i++) { assertEquals(allMetrics.get(i).getName(), "Metric-" + i); assertEquals(allMetrics.get(i).getValues().get(0), i, 1e-5); } @@ -233,19 +233,21 @@ public void testConcurrentFlushAndPutMetric() when(environment.getSink()).thenReturn(groupedSink); logger = new MetricsLogger(envProvider); - long targetTimestampToRun = System.currentTimeMillis() + 500; + Random rand = new Random(); - Thread[] threads = new Thread[100]; - for (int i = 0; i < 100; i++) { + Thread[] threads = new Thread[500]; + for (int i = 0; i < 500; i++) { final int id = i; + int randTime = rand.nextInt(1000); threads[i] = new Thread( () -> { try { // half threads do putMetric(), half do flush() - Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); + // sleep to introduce more chaos in thread ordering + Thread.sleep(randTime); if (id % 2 == 0) { - for (int j = id; j < id + 2; j++) { + for (int j = id * 500; j < id * 500 + 1000; j++) { logger.putMetric("Metric-" + j, j); } } else { @@ -265,16 +267,16 @@ public void testConcurrentFlushAndPutMetric() ArrayList allMetrics = new ArrayList<>(); for (List events : groupedSink.getLogEventList()) { - ArrayList metrics = parseMetrics(events); + ArrayList metrics = parseAllMetrics(events); allMetrics.addAll(metrics); } - assertEquals(allMetrics.size(), 100); + assertEquals(allMetrics.size(), 250000); for (MetricDefinitionCopy metric : allMetrics) { assertEquals(metric.getValues().size(), 1); } Collections.sort(allMetrics, Comparator.comparingDouble(m -> m.getValues().get(0))); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 250000; i++) { assertEquals(allMetrics.get(i).getName(), "Metric-" + i); assertEquals(allMetrics.get(i).getValues().get(0), i, 1e-5); } @@ -288,24 +290,26 @@ public void testConcurrentFlushAndMethodsOtherThanPutMetric() throws Interrupted when(environment.getSink()).thenReturn(groupedSink); logger = new MetricsLogger(envProvider); -// Random rand = new Random(); - long targetTimestampToRun = System.currentTimeMillis() + 500; + Random rand = new Random(); - Thread[] threads = new Thread[100]; - for (int i = 0; i < 100; i++) { + Thread[] threads = new Thread[600]; + for (int i = 0; i < 600; i++) { final int id = i; -// int randTime = rand.nextInt(1000); + int randTime = rand.nextInt(1000); threads[i] = new Thread( () -> { try { -// Thread.sleep(randTime); - Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); - if (id < 30) { - logger.putDimensions( - DimensionSet.of("Dim", String.valueOf(id))); - } else if (id < 40) { - logger.putProperty("Property-" + id, id); + Thread.sleep(randTime); + if (id < 200) { + for (int j = id * 100; j < id * 100 + 100; j++) { + logger.putDimensions( + DimensionSet.of("Dim", String.valueOf(j))); + } + } else if (id < 400) { + for (int k = id * 100; k < id * 100 + 100; k++) { + logger.putProperty("Property-" + k, k); + } } else { logger.flush(); } @@ -324,71 +328,93 @@ public void testConcurrentFlushAndMethodsOtherThanPutMetric() throws Interrupted int contextNum = groupedSink.getContexts().size(); MetricsContext finalContext = groupedSink.getContexts().get(contextNum - 1); List dimensions = finalContext.getDimensions(); - // check size - assertEquals(dimensions.size(), 30); + + // check dimension size + assertEquals(dimensions.size(), 20000); for (DimensionSet dim : dimensions) { Assert.assertEquals(dim.getDimensionKeys().size(), 4); // there are 3 default dimensions } - // check content + // check dimension content Collections.sort( dimensions, Comparator.comparingInt(d -> Integer.parseInt(d.getDimensionValue("Dim")))); - for (int i = 0; i < 30; i++) { + for (int i = 0; i < 2000; i++) { Assert.assertEquals(dimensions.get(i).getDimensionValue("Dim"), String.valueOf(i)); } + // check property int propertyCnt = 0; for (MetricsContext mc : groupedSink.getContexts()) { - propertyCnt += mc.getProperty("Property-30") == null ? 0 : 1; - propertyCnt += mc.getProperty("Property-31") == null ? 0 : 1; - propertyCnt += mc.getProperty("Property-32") == null ? 0 : 1; - propertyCnt += mc.getProperty("Property-33") == null ? 0 : 1; - propertyCnt += mc.getProperty("Property-34") == null ? 0 : 1; - propertyCnt += mc.getProperty("Property-35") == null ? 0 : 1; - propertyCnt += mc.getProperty("Property-36") == null ? 0 : 1; - propertyCnt += mc.getProperty("Property-37") == null ? 0 : 1; - propertyCnt += mc.getProperty("Property-38") == null ? 0 : 1; - propertyCnt += mc.getProperty("Property-39") == null ? 0 : 1; - } - assertEquals(propertyCnt, 10); + for (int i = 20000; i < 40000; i++) { + propertyCnt += mc.getProperty("Property-" + i) == null ? 0 : 1; + } + } + assertEquals(propertyCnt, 20000); } @After public void tearDown() throws Throwable { if (throwable != null) throw throwable; + throwable = null; // reset throwable to prevent repeat throwing } private Map parseRootNode(String event) throws JsonProcessingException { return new JsonMapper().readValue(event, new TypeReference>() {}); } - @SuppressWarnings("unchecked") - // can not parse all metrics if metric number exceeds MAX_METRICS_PER_EVENT - private ArrayList parseMetrics(List events) - throws JsonProcessingException { - Map rootNode = parseRootNode(events.get(0)); - Map metadata = (Map) rootNode.get("_aws"); + @Test + public void testParseMetrics() throws JsonProcessingException { + for (int i = 0; i < 150; i++) { + logger.putMetric("Metric-" + i, i); + } + logger.flush(); - if (metadata == null) { - return new ArrayList<>(); + ArrayList metrics = parseAllMetrics(sink.getLogEvents()); + System.out.println(metrics.size()); + for (String line : sink.getLogEvents()) { + System.out.println(line); } - ArrayList> metricDirectives = - (ArrayList>) metadata.get("CloudWatchMetrics"); - ArrayList> metrics = - (ArrayList>) metricDirectives.get(0).get("Metrics"); + for (MetricDefinitionCopy metric : metrics) { + assertEquals(metric.getValues().size(), 1); + } + Collections.sort(metrics, Comparator.comparingDouble(m -> m.getValues().get(0))); + for (int i = 0; i < 150; i++) { + assertEquals(metrics.get(i).getName(), "Metric-" + i); + assertEquals(metrics.get(i).getValues().get(0), i, 1e-5); + } + } + @SuppressWarnings("unchecked") + // can parse all metrics even if metric number exceeds MAX_METRICS_PER_EVENT + private ArrayList parseAllMetrics(List events) + throws JsonProcessingException { ArrayList metricDefinitions = new ArrayList<>(); - for (Map metric : metrics) { - String name = metric.get("Name"); - Unit unit = Unit.fromValue(metric.get("Unit")); - Object value = rootNode.get(name); - if (value instanceof ArrayList) { - metricDefinitions.add(new MetricDefinitionCopy(name, unit, (ArrayList) value)); - } else { - metricDefinitions.add(new MetricDefinitionCopy(name, unit, (double) value)); + for (String event : events) { + Map rootNode = parseRootNode(event); + Map metadata = (Map) rootNode.get("_aws"); + + if (metadata == null) { + continue; + } + + ArrayList> metricDirectives = + (ArrayList>) metadata.get("CloudWatchMetrics"); + ArrayList> metrics = + (ArrayList>) metricDirectives.get(0).get("Metrics"); + + for (Map metric : metrics) { + String name = metric.get("Name"); + Unit unit = Unit.fromValue(metric.get("Unit")); + Object value = rootNode.get(name); + if (value instanceof ArrayList) { + metricDefinitions.add(new MetricDefinitionCopy(name, unit, (ArrayList) value)); + } else { + metricDefinitions.add(new MetricDefinitionCopy(name, unit, (double) value)); + } } } + return metricDefinitions; } diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java index e644d27f..f2b0bcde 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java @@ -83,5 +83,6 @@ public void testConcurrentPutMetricWithSameKey() throws InterruptedException { @After public void tearDown() throws Throwable { if (throwable != null) throw throwable; + throwable = null; } } diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextThreadSafetyTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextThreadSafetyTest.java index c2810d9e..3111e88f 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextThreadSafetyTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextThreadSafetyTest.java @@ -47,5 +47,6 @@ public void testConcurrentPutMetaData() throws InterruptedException { @After public void tearDown() throws Throwable { if (throwable != null) throw throwable; + throwable = null; } } From 4eed2f8df1d68201e3812096800d40e0b31e6ed7 Mon Sep 17 00:00:00 2001 From: Stephen-Bao Date: Mon, 1 Aug 2022 16:37:19 -0400 Subject: [PATCH 07/14] Changed benchmark to use fixed batch size and updated the results --- .../emf/MetricsLoggerBenchmark.java | 142 +++++++++--------- 1 file changed, 73 insertions(+), 69 deletions(-) diff --git a/src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java b/src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java index 775d999e..28135d15 100644 --- a/src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java +++ b/src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java @@ -80,19 +80,8 @@ public void measureAllMethods() { } /** - * Publish 10000 metrics with 5 threads. no lock: 0.758 ms/op; RW lock: 2.816 ms/op; S lock: - * 2.247 ms/op - * - * @throws InterruptedException - */ - @Benchmark - public void measurePutMetricWith5Threads() throws InterruptedException { - measurePutMetricWithNThreads(5); - } - - /** - * Publish 10000 metrics with 10 threads. no lock: 0.949 ms/op; RW lock: 3.823 ms/op; S lock: - * 3.078 ms/op + * Each thread publishes 1000 metrics, 10 threads in total. + * no lock: 0.949 ms/op; RW lock: 3.823 ms/op; S lock: 3.078 ms/op * * @throws InterruptedException */ @@ -102,8 +91,8 @@ public void measurePutMetricWith10Threads() throws InterruptedException { } /** - * Publish 10000 metrics with 20 threads. no lock: 1.610 ms/op; RW lock: 3.349 ms/op; S lock: - * 2.644 ms/op + * Each thread publishes 1000 metrics, 20 threads in total. + * no lock: 1.860 ms/op; RW lock: 9.806 ms/op; S lock: 7.929 ms/op * * @throws InterruptedException */ @@ -113,8 +102,8 @@ public void measurePutMetricWith20Threads() throws InterruptedException { } /** - * Publish 10000 metrics with 50 threads. no lock: 4.161 ms/op; RW lock: 4.107 ms/op; S lock: - * 4.184 ms/op + * Each thread publishes 1000 metrics, 50 threads in total. + * no lock: 6.548 ms/op; RW lock: 28.754 ms/op; S lock: 24.700 ms/op * * @throws InterruptedException */ @@ -124,30 +113,32 @@ public void measurePutMetricWith50Threads() throws InterruptedException { } /** - * Publish 10000 metrics with 100 threads. no lock: 8.648 ms/op; RW lock: 9.071 ms/op; S lock: - * 8.576 ms/op + * Each thread publishes 1000 metrics, 200 threads in total. + * no lock: 37.662 ms/op; RW lock: 135.824 ms/op; S lock: 114.467 ms/op * * @throws InterruptedException */ @Benchmark - public void measurePutMetricWith100Threads() throws InterruptedException { - measurePutMetricWithNThreads(100); + public void measurePutMetricWith200Threads() throws InterruptedException { + measurePutMetricWithNThreads(200); } /** - * Flush 1000 times with 5 threads. no lock: 7.529 ms/op; RW lock: 22.742 ms/op; S lock: 23.304 - * ms/op + * Each thread publishes 1000 metrics, 500 threads in total. + * no lock: 90.148 ms/op; RW lock: 345.197 ms/op; S lock: 287.908 ms/op * * @throws InterruptedException */ @Benchmark - public void measureFlushWith5Threads() throws InterruptedException { - measureFlushWithNThreads(5); + @Warmup(time = 10) + @Measurement(time = 10) + public void measurePutMetricWith500Threads() throws InterruptedException { + measurePutMetricWithNThreads(500); } /** - * Flush 1000 times with 10 threads. no lock: 12.900 ms/op; RW lock: 25.015 ms/op; S lock: - * 24.778 ms/op + * Each thread flushes 100 times, 10 threads in total. + * no lock: 12.900 ms/op; RW lock: 25.015 ms/op; S lock: 24.778 ms/op * * @throws InterruptedException */ @@ -157,8 +148,8 @@ public void measureFlushWith10Threads() throws InterruptedException { } /** - * Flush 1000 times with 20 threads. no lock: 6.537 ms/op; RW lock: 25.705 ms/op; S lock: 26.465 - * ms/op + * Each thread flushes 100 times, 20 threads in total. + * no lock: 20.824 ms/op; RW lock: 47.123 ms/op; S lock: 48.511 ms/op * * @throws InterruptedException */ @@ -168,8 +159,8 @@ public void measureFlushWith20Threads() throws InterruptedException { } /** - * Flush 1000 times with 50 threads. no lock: 24.985 ms/op; RW lock: 31.453 ms/op; S lock: - * 31.965 ms/op + * Each thread flushes 100 times, 50 threads in total. + * no lock: 77.463 ms/op; RW lock: 121.857 ms/op; S lock: 125.212 ms/op * * @throws InterruptedException */ @@ -179,71 +170,83 @@ public void measureFlushWith50Threads() throws InterruptedException { } /** - * Flush 1000 times with 100 threads. no lock: 34.527 ms/op; RW lock: 39.606 ms/op; S lock: - * 40.007 ms/op + * Each thread flushes 100 times, 200 threads in total. + * no lock: 390.252 ms/op; RW lock: 474.439 ms/op; S lock: 488.809 ms/op * * @throws InterruptedException */ @Benchmark - public void measureFlushWith100Threads() throws InterruptedException { - measureFlushWithNThreads(100); + public void measureFlushWith200Threads() throws InterruptedException { + measureFlushWithNThreads(200); } /** - * Flush 1000 times with 1000 threads. no lock: 116.047 ms/op; RW lock: 141.227 ms/op; S lock: - * 141.597 ms/op + * Each thread flushes 100 times, 500 threads in total. + * no lock: 300.280 ms/op; RW lock: 1161.098 ms/op; S lock: 1247.972 ms/op * * @throws InterruptedException */ @Benchmark @Warmup(time = 10) @Measurement(time = 10) - public void measureFlushWith1000Threads() throws InterruptedException { - measureFlushWithNThreads(1000); + public void measureFlushWith500Threads() throws InterruptedException { + measureFlushWithNThreads(500); } /** - * Execute all methods for 1000 times with 5 threads. no lock (need to sync getAllDimensions() & - * getAllDimensionKeys() in MetricsDirective): 84.041 ± 24.965 ms/op; RW lock: 264.439 ± 8.070 - * ms/op; S lock: 264.630 ± 24.252 ms/op + * Each thread executes all methods 100 times, 10 threads in total. + * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 7.215 ms/op; + * RW lock: 32.159; S lock: 34.226 * * @throws InterruptedException */ @Benchmark - public void measureAllMethodsWith5Threads() throws InterruptedException { - measureAllMethodsWithNThreads(5); + public void measureAllMethodsWith10Threads() throws InterruptedException { + measureAllMethodsWithNThreads(10); } /** - * Execute all methods for 1000 times with 10 threads. no lock (need to sync getAllDimensions() - * & getAllDimensionKeys() in MetricsDirective): 41.174 ± 6.084 ms/op; RW lock: 263.103 ± 15.141 - * ms/op; S lock: 256.267 ± 30.922 ms/op + * Each thread executes all methods 100 times, 20 threads in total. + * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 11.833 ms/op; + * RW lock: 60.510 ms/op; S lock: 75.125 ms/op * * @throws InterruptedException */ @Benchmark - public void measureAllMethodsWith10Threads() throws InterruptedException { - measureAllMethodsWithNThreads(10); + public void measureAllMethodsWith20Threads() throws InterruptedException { + measureAllMethodsWithNThreads(20); } /** - * Execute all methods for 1000 times with 100 threads. no lock (need to sync getAllDimensions() - * & getAllDimensionKeys() in MetricsDirective): 35.779 ± 2.414 ms/op; RW lock: 315.340 ± 16.074 - * ms/op; S lock: 288.459 ± 5.801 ms/op + * Each thread executes all methods 100 times, 50 threads in total. + * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 36.051 ms/op; + * RW lock: 150.022 ms/op; S lock: 244.934 ms/op + * + * @throws InterruptedException + */ + @Benchmark + public void measureAllMethodsWith50Threads() throws InterruptedException { + measureAllMethodsWithNThreads(50); + } + + /** + * Each thread executes all methods 100 times, 200 threads in total. + * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 108.775 ms/op; + * RW lock: 629.826 ms/op; S lock: 1220.959 ms/op * * @throws InterruptedException */ @Benchmark @Warmup(time = 10) @Measurement(time = 10) - public void measureAllMethodsWith100Threads() throws InterruptedException { - measureAllMethodsWithNThreads(100); + public void measureAllMethodsWith200Threads() throws InterruptedException { + measureAllMethodsWithNThreads(200); } /** - * Execute all methods for 1000 times with 500 threads. no lock (need to sync getAllDimensions() - * & getAllDimensionKeys() in MetricsDirective): 81.785 ± 11.616 ms/op; RW lock: 346.697 ± - * 51.133 ms/op; S lock: 368.981 ± 161.049 ms/op + * Each thread executes all methods 100 times, 500 threads in total. + * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 335.183 ms/op; + * RW lock: 1741.003 ms/op; S lock: 4192.327 ms/op * * @throws InterruptedException */ @@ -255,15 +258,15 @@ public void measureAllMethodsWith500Threads() throws InterruptedException { } /** - * Execute all methods for 1000 times with 1000 threads. no lock (need to sync - * getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 218.505 ± 178.808 ms/op; RW - * lock: 436.380 ± 317.130 ms/op; S lock: 390.074 ± 100 ms/op + * Each thread executes all methods 100 times, 1000 threads in total. + * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 575.339 ms/op; + * RW lock: 3230.403 ms/op; S lock: 13519.459 ms/op * * @throws InterruptedException */ @Benchmark - @Warmup(time = 10) - @Measurement(time = 10) + @Warmup(time = 20) + @Measurement(time = 20) public void measureAllMethodsWith1000Threads() throws InterruptedException { measureAllMethodsWithNThreads(1000); } @@ -271,10 +274,10 @@ public void measureAllMethodsWith1000Threads() throws InterruptedException { private void measurePutMetricWithNThreads(int n) throws InterruptedException { logger = new MetricsLogger(envProvider); Thread[] threads = new Thread[n]; + int batchSize = 1000; for (int i = 0; i < n; i++) { final int id = i; - int batchSize = 10000 / n; threads[i] = new Thread( () -> { @@ -293,10 +296,10 @@ private void measurePutMetricWithNThreads(int n) throws InterruptedException { private void measureFlushWithNThreads(int n) throws InterruptedException { logger = new MetricsLogger(envProvider); Thread[] threads = new Thread[n]; + int batchSize = 100; for (int i = 0; i < n; i++) { final int id = i; - int batchSize = 1000 / n; threads[i] = new Thread( () -> { @@ -317,19 +320,20 @@ private void measureFlushWithNThreads(int n) throws InterruptedException { private void measureAllMethodsWithNThreads(int n) throws InterruptedException { logger = new MetricsLogger(envProvider); Thread[] threads = new Thread[n]; + int batchSize = 100; for (int i = 0; i < n; i++) { final int id = i; - int batchSize = 1000 / n; threads[i] = new Thread( () -> { for (int j = batchSize * id; j < batchSize * id + batchSize; j++) { - logger.putMetadata("MetaData-" + id, id); - logger.putProperty("Property-" + id, id); - logger.putDimensions( - DimensionSet.of("Dim-" + id, String.valueOf(id))); logger.putMetric("Metric-" + j, j); + logger.putProperty("Property-" + j, j); + logger.putMetadata("MetaData-" + j, j); + logger.setDimensions( + DimensionSet.of("Dim-" + j, String.valueOf(j))); + logger.flush(); } }); From 6151ece562049725751901e324891aeb5d485926 Mon Sep 17 00:00:00 2001 From: Stephen-Bao Date: Mon, 1 Aug 2022 16:49:15 -0400 Subject: [PATCH 08/14] Added benchmark section in README --- README.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/README.md b/README.md index bff04ff9..156a1759 100644 --- a/README.md +++ b/README.md @@ -362,6 +362,14 @@ To auto fix code style, run ./gradlew :spotlessApply ``` +### Benchmark + +We use [JMH](https://github.com/openjdk/jmh) as our framework for concurrency performance benchmarking. Benchmarks can be run by: +``` +./gradlew jmh +``` +To run a single benchmark, consider using JMH plugins. For example, [JMH plugin for IntelliJ IDEA](https://github.com/artyushov/idea-jmh-plugin) + ## License This project is licensed under the Apache-2.0 License. From 0a8e009a09853cb6a959718f823a6fef01d4e052 Mon Sep 17 00:00:00 2001 From: Stephen-Bao Date: Mon, 1 Aug 2022 18:07:29 -0400 Subject: [PATCH 09/14] Added documentation on synchronization policy --- README.md | 18 ++++++++++++++++++ .../emf/logger/MetricsLogger.java | 5 +++++ 2 files changed, 23 insertions(+) diff --git a/README.md b/README.md index 156a1759..131cb056 100644 --- a/README.md +++ b/README.md @@ -316,6 +316,24 @@ config.setAgentEndpoint("udp://127.0.0.1:1000"); AWS_EMF_AGENT_ENDPOINT="udp://127.0.0.1:1000" ``` +## Thread-safety + +### Internal Synchronization + +The MetricsLogger class is thread-safe. Specifically, the generalized multi-threading use cases for this library are: + +1. Collect some metrics or metadata on a single MetricsLogger; Pass the logger into one or more async contexts where new metrics or metadata can be added concurrently; Join the async contexts (e.g. Future.get()) and flush the metrics. +2. Collect some metrics or metadata on a single MetricsLogger; Pass the logger into an async context; Flush from the async context concurrently. + +Thread-safety for the first use case is achieved by introducing concurrent internal data structures and atomic operations associated with these models, to ensure the access to shared mutable resources are always synchronized. + +Thread-safety for the second use case is achieved by using a ReentrantReadWriteLock. This lock is used to create an internal sync context for flush() method in multi-threading situations. `flush()` acquires write lock, while other methods (which have access to mutable shared data with `flush()`) acquires read lock. This makes sure `flush()` is always executed exclusively, while other methods can be executed concurrently. + +### Use Cases that are Not Covered + +With all the internal synchronization measures, however, there're still certain multi-threading use cases that are not covered by this library, which might require external synchronizations or other protection measures. +This is due to the fact that the execution order of APIs are not determined in async contexts. For example, if user needs to associate a given set of properties with a metric in each thread, the results are not guaranteed since the execution order of `putProperty()` is not determined across threads. In such cases, we recommend using a different MetricsLogger instance for different threads, so that no resources are shared and no thread-safety problem would ever happen. Note that this can often be simplified by using a ThreadLocal variable. + ## Examples Check out the [examples](https://github.com/awslabs/aws-embedded-metrics-java/tree/master/examples) directory to get started. diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java b/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java index cfcdbab1..88e61f80 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java @@ -38,6 +38,11 @@ public class MetricsLogger { private MetricsContext context; private CompletableFuture environmentFuture; private EnvironmentProvider environmentProvider; + /** + * This lock is used to create an internal sync context for flush() method in multi-threaded situations. + * Flush() acquires write lock, other methods (accessing mutable shared data with flush()) acquires read lock. + * This makes sure flush() is executed exclusively, while other methods can be executed concurrently. + */ private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); public MetricsLogger() { From 3ae12850278ebd09a75359351094be4a6f378f88 Mon Sep 17 00:00:00 2001 From: Stephen-Bao Date: Mon, 1 Aug 2022 19:20:23 -0400 Subject: [PATCH 10/14] Adjusted the code format --- .../emf/MetricsLoggerBenchmark.java | 76 +++++++++---------- .../emf/logger/MetricsLogger.java | 53 +++++++------ .../logger/MetricsLoggerThreadSafetyTest.java | 14 +++- .../MetricDirectiveThreadSafetyTest.java | 11 ++- 4 files changed, 86 insertions(+), 68 deletions(-) diff --git a/src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java b/src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java index 28135d15..7b273a6c 100644 --- a/src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java +++ b/src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java @@ -80,8 +80,8 @@ public void measureAllMethods() { } /** - * Each thread publishes 1000 metrics, 10 threads in total. - * no lock: 0.949 ms/op; RW lock: 3.823 ms/op; S lock: 3.078 ms/op + * Each thread publishes 1000 metrics, 10 threads in total. no lock: 0.949 ms/op; RW lock: 3.823 + * ms/op; S lock: 3.078 ms/op * * @throws InterruptedException */ @@ -91,8 +91,8 @@ public void measurePutMetricWith10Threads() throws InterruptedException { } /** - * Each thread publishes 1000 metrics, 20 threads in total. - * no lock: 1.860 ms/op; RW lock: 9.806 ms/op; S lock: 7.929 ms/op + * Each thread publishes 1000 metrics, 20 threads in total. no lock: 1.860 ms/op; RW lock: 9.806 + * ms/op; S lock: 7.929 ms/op * * @throws InterruptedException */ @@ -102,8 +102,8 @@ public void measurePutMetricWith20Threads() throws InterruptedException { } /** - * Each thread publishes 1000 metrics, 50 threads in total. - * no lock: 6.548 ms/op; RW lock: 28.754 ms/op; S lock: 24.700 ms/op + * Each thread publishes 1000 metrics, 50 threads in total. no lock: 6.548 ms/op; RW lock: + * 28.754 ms/op; S lock: 24.700 ms/op * * @throws InterruptedException */ @@ -113,8 +113,8 @@ public void measurePutMetricWith50Threads() throws InterruptedException { } /** - * Each thread publishes 1000 metrics, 200 threads in total. - * no lock: 37.662 ms/op; RW lock: 135.824 ms/op; S lock: 114.467 ms/op + * Each thread publishes 1000 metrics, 200 threads in total. no lock: 37.662 ms/op; RW lock: + * 135.824 ms/op; S lock: 114.467 ms/op * * @throws InterruptedException */ @@ -124,8 +124,8 @@ public void measurePutMetricWith200Threads() throws InterruptedException { } /** - * Each thread publishes 1000 metrics, 500 threads in total. - * no lock: 90.148 ms/op; RW lock: 345.197 ms/op; S lock: 287.908 ms/op + * Each thread publishes 1000 metrics, 500 threads in total. no lock: 90.148 ms/op; RW lock: + * 345.197 ms/op; S lock: 287.908 ms/op * * @throws InterruptedException */ @@ -137,8 +137,8 @@ public void measurePutMetricWith500Threads() throws InterruptedException { } /** - * Each thread flushes 100 times, 10 threads in total. - * no lock: 12.900 ms/op; RW lock: 25.015 ms/op; S lock: 24.778 ms/op + * Each thread flushes 100 times, 10 threads in total. no lock: 12.900 ms/op; RW lock: 25.015 + * ms/op; S lock: 24.778 ms/op * * @throws InterruptedException */ @@ -148,8 +148,8 @@ public void measureFlushWith10Threads() throws InterruptedException { } /** - * Each thread flushes 100 times, 20 threads in total. - * no lock: 20.824 ms/op; RW lock: 47.123 ms/op; S lock: 48.511 ms/op + * Each thread flushes 100 times, 20 threads in total. no lock: 20.824 ms/op; RW lock: 47.123 + * ms/op; S lock: 48.511 ms/op * * @throws InterruptedException */ @@ -159,8 +159,8 @@ public void measureFlushWith20Threads() throws InterruptedException { } /** - * Each thread flushes 100 times, 50 threads in total. - * no lock: 77.463 ms/op; RW lock: 121.857 ms/op; S lock: 125.212 ms/op + * Each thread flushes 100 times, 50 threads in total. no lock: 77.463 ms/op; RW lock: 121.857 + * ms/op; S lock: 125.212 ms/op * * @throws InterruptedException */ @@ -170,8 +170,8 @@ public void measureFlushWith50Threads() throws InterruptedException { } /** - * Each thread flushes 100 times, 200 threads in total. - * no lock: 390.252 ms/op; RW lock: 474.439 ms/op; S lock: 488.809 ms/op + * Each thread flushes 100 times, 200 threads in total. no lock: 390.252 ms/op; RW lock: 474.439 + * ms/op; S lock: 488.809 ms/op * * @throws InterruptedException */ @@ -181,8 +181,8 @@ public void measureFlushWith200Threads() throws InterruptedException { } /** - * Each thread flushes 100 times, 500 threads in total. - * no lock: 300.280 ms/op; RW lock: 1161.098 ms/op; S lock: 1247.972 ms/op + * Each thread flushes 100 times, 500 threads in total. no lock: 300.280 ms/op; RW lock: + * 1161.098 ms/op; S lock: 1247.972 ms/op * * @throws InterruptedException */ @@ -194,9 +194,9 @@ public void measureFlushWith500Threads() throws InterruptedException { } /** - * Each thread executes all methods 100 times, 10 threads in total. - * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 7.215 ms/op; - * RW lock: 32.159; S lock: 34.226 + * Each thread executes all methods 100 times, 10 threads in total. no lock (need to sync + * getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 7.215 ms/op; RW lock: + * 32.159; S lock: 34.226 * * @throws InterruptedException */ @@ -206,9 +206,9 @@ public void measureAllMethodsWith10Threads() throws InterruptedException { } /** - * Each thread executes all methods 100 times, 20 threads in total. - * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 11.833 ms/op; - * RW lock: 60.510 ms/op; S lock: 75.125 ms/op + * Each thread executes all methods 100 times, 20 threads in total. no lock (need to sync + * getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 11.833 ms/op; RW lock: + * 60.510 ms/op; S lock: 75.125 ms/op * * @throws InterruptedException */ @@ -218,9 +218,9 @@ public void measureAllMethodsWith20Threads() throws InterruptedException { } /** - * Each thread executes all methods 100 times, 50 threads in total. - * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 36.051 ms/op; - * RW lock: 150.022 ms/op; S lock: 244.934 ms/op + * Each thread executes all methods 100 times, 50 threads in total. no lock (need to sync + * getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 36.051 ms/op; RW lock: + * 150.022 ms/op; S lock: 244.934 ms/op * * @throws InterruptedException */ @@ -230,9 +230,9 @@ public void measureAllMethodsWith50Threads() throws InterruptedException { } /** - * Each thread executes all methods 100 times, 200 threads in total. - * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 108.775 ms/op; - * RW lock: 629.826 ms/op; S lock: 1220.959 ms/op + * Each thread executes all methods 100 times, 200 threads in total. no lock (need to sync + * getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 108.775 ms/op; RW lock: + * 629.826 ms/op; S lock: 1220.959 ms/op * * @throws InterruptedException */ @@ -244,9 +244,9 @@ public void measureAllMethodsWith200Threads() throws InterruptedException { } /** - * Each thread executes all methods 100 times, 500 threads in total. - * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 335.183 ms/op; - * RW lock: 1741.003 ms/op; S lock: 4192.327 ms/op + * Each thread executes all methods 100 times, 500 threads in total. no lock (need to sync + * getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 335.183 ms/op; RW lock: + * 1741.003 ms/op; S lock: 4192.327 ms/op * * @throws InterruptedException */ @@ -258,9 +258,9 @@ public void measureAllMethodsWith500Threads() throws InterruptedException { } /** - * Each thread executes all methods 100 times, 1000 threads in total. - * no lock (need to sync getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 575.339 ms/op; - * RW lock: 3230.403 ms/op; S lock: 13519.459 ms/op + * Each thread executes all methods 100 times, 1000 threads in total. no lock (need to sync + * getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 575.339 ms/op; RW lock: + * 3230.403 ms/op; S lock: 13519.459 ms/op * * @throws InterruptedException */ diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java b/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java index 88e61f80..066677c8 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java @@ -20,7 +20,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; - import lombok.extern.slf4j.Slf4j; import software.amazon.cloudwatchlogs.emf.environment.Environment; import software.amazon.cloudwatchlogs.emf.environment.EnvironmentProvider; @@ -39,9 +38,10 @@ public class MetricsLogger { private CompletableFuture environmentFuture; private EnvironmentProvider environmentProvider; /** - * This lock is used to create an internal sync context for flush() method in multi-threaded situations. - * Flush() acquires write lock, other methods (accessing mutable shared data with flush()) acquires read lock. - * This makes sure flush() is executed exclusively, while other methods can be executed concurrently. + * This lock is used to create an internal sync context for flush() method in multi-threaded + * situations. Flush() acquires write lock, other methods (accessing mutable shared data with + * flush()) acquires read lock. This makes sure flush() is executed exclusively, while other + * methods can be executed concurrently. */ private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); @@ -98,10 +98,11 @@ public void flush() { * @return the current logger */ public MetricsLogger putProperty(String key, Object value) { - return applyReadLock(() -> { - this.context.putProperty(key, value); - return this; - }); + return applyReadLock( + () -> { + this.context.putProperty(key, value); + return this; + }); } /** @@ -116,10 +117,11 @@ public MetricsLogger putProperty(String key, Object value) { * @return the current logger */ public MetricsLogger putDimensions(DimensionSet dimensions) { - return applyReadLock(() -> { - context.putDimension(dimensions); - return this; - }); + return applyReadLock( + () -> { + context.putDimension(dimensions); + return this; + }); } /** @@ -132,10 +134,11 @@ public MetricsLogger putDimensions(DimensionSet dimensions) { * @return the current logger */ public MetricsLogger setDimensions(DimensionSet... dimensionSets) { - return applyReadLock(() -> { - context.setDimensions(dimensionSets); - return this; - }); + return applyReadLock( + () -> { + context.setDimensions(dimensionSets); + return this; + }); } /** @@ -149,10 +152,11 @@ public MetricsLogger setDimensions(DimensionSet... dimensionSets) { * @return the current logger */ public MetricsLogger putMetric(String key, double value, Unit unit) { - return applyReadLock(() -> { - this.context.putMetric(key, value, unit); - return this; - }); + return applyReadLock( + () -> { + this.context.putMetric(key, value, unit); + return this; + }); } /** @@ -180,10 +184,11 @@ public MetricsLogger putMetric(String key, double value) { * @return the current logger */ public MetricsLogger putMetadata(String key, Object value) { - return applyReadLock(() -> { - this.context.putMetadata(key, value); - return this; - }); + return applyReadLock( + () -> { + this.context.putMetadata(key, value); + return this; + }); } /** diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java index aa3056c7..35011dc8 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java @@ -64,7 +64,9 @@ public void testConcurrentPutProperty() throws InterruptedException { Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); for (int j = 0; j < 1000; j++) { int propertyId = 1000 * id + j; - logger.putProperty("Property-" + propertyId, String.valueOf(propertyId)); + logger.putProperty( + "Property-" + propertyId, + String.valueOf(propertyId)); } } catch (Throwable e) { throwable = e; // ensure no exceptions are thrown @@ -98,7 +100,9 @@ public void testConcurrentPutDimension() throws InterruptedException { Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); for (int j = 0; j < 1000; j++) { int dimensionId = 1000 * id + j; - logger.putDimensions(DimensionSet.of("Dim", String.valueOf(dimensionId))); + logger.putDimensions( + DimensionSet.of( + "Dim", String.valueOf(dimensionId))); } } catch (Throwable e) { throwable = e; @@ -144,7 +148,9 @@ public void testConcurrentPutDimensionAfterSetDimension() throws InterruptedExce Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); for (int j = 0; j < 1000; j++) { int dimensionId = 1000 * id + j + 1; - logger.putDimensions(DimensionSet.of("Dim", String.valueOf(dimensionId))); + logger.putDimensions( + DimensionSet.of( + "Dim", String.valueOf(dimensionId))); } } catch (Throwable e) { throwable = e; @@ -355,7 +361,7 @@ public void testConcurrentFlushAndMethodsOtherThanPutMetric() throws Interrupted @After public void tearDown() throws Throwable { if (throwable != null) throw throwable; - throwable = null; // reset throwable to prevent repeat throwing + throwable = null; // reset throwable to prevent repeat throwing } private Map parseRootNode(String event) throws JsonProcessingException { diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java index f2b0bcde..416089d0 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java @@ -13,7 +13,9 @@ public class MetricDirectiveThreadSafetyTest { public void testConcurrentPutMetricWithDifferentKey() throws InterruptedException { MetricDirective metricDirective = new MetricDirective(); Thread[] threads = new Thread[100]; - long targetTimestampToRun = System.currentTimeMillis() + 500; // all threads should target running on this timestamp + long targetTimestampToRun = + System.currentTimeMillis() + + 500; // all threads should target running on this timestamp for (int i = 0; i < 100; i++) { final int id = i; @@ -21,7 +23,12 @@ public void testConcurrentPutMetricWithDifferentKey() throws InterruptedExceptio new Thread( () -> { try { - Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); // try to make all threads run at same time + Thread.sleep( + targetTimestampToRun + - System.currentTimeMillis()); // try to make + // all threads + // run at same + // time for (int j = 0; j < 1000; j++) { int metricId = 1000 * id + j; metricDirective.putMetric("Metric-" + metricId, metricId); From baea6cde724db04403c30c205a04586f828d5c3e Mon Sep 17 00:00:00 2001 From: Stephen-Bao Date: Tue, 2 Aug 2022 11:52:05 -0400 Subject: [PATCH 11/14] retry integ test --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 131cb056..3f015123 100644 --- a/README.md +++ b/README.md @@ -320,7 +320,7 @@ AWS_EMF_AGENT_ENDPOINT="udp://127.0.0.1:1000" ### Internal Synchronization -The MetricsLogger class is thread-safe. Specifically, the generalized multi-threading use cases for this library are: +The MetricsLogger class is thread-safe. Specifically, the generalized multi-threading use cases for this library are: 1. Collect some metrics or metadata on a single MetricsLogger; Pass the logger into one or more async contexts where new metrics or metadata can be added concurrently; Join the async contexts (e.g. Future.get()) and flush the metrics. 2. Collect some metrics or metadata on a single MetricsLogger; Pass the logger into an async context; Flush from the async context concurrently. From b4af597862c6cac521c600c9bfe8291518f09659 Mon Sep 17 00:00:00 2001 From: Stephen-Bao Date: Tue, 2 Aug 2022 11:58:12 -0400 Subject: [PATCH 12/14] code format --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3f015123..131cb056 100644 --- a/README.md +++ b/README.md @@ -320,7 +320,7 @@ AWS_EMF_AGENT_ENDPOINT="udp://127.0.0.1:1000" ### Internal Synchronization -The MetricsLogger class is thread-safe. Specifically, the generalized multi-threading use cases for this library are: +The MetricsLogger class is thread-safe. Specifically, the generalized multi-threading use cases for this library are: 1. Collect some metrics or metadata on a single MetricsLogger; Pass the logger into one or more async contexts where new metrics or metadata can be added concurrently; Join the async contexts (e.g. Future.get()) and flush the metrics. 2. Collect some metrics or metadata on a single MetricsLogger; Pass the logger into an async context; Flush from the async context concurrently. From 9a9160b52745579bf6cebddb4f4b72a51c869ffb Mon Sep 17 00:00:00 2001 From: Stephen-Bao Date: Tue, 2 Aug 2022 15:40:43 -0400 Subject: [PATCH 13/14] made a minor change --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 131cb056..a31e5fdf 100644 --- a/README.md +++ b/README.md @@ -300,7 +300,7 @@ Example: AWS_EMF_ENVIRONMENT="Local" ``` -**AgentEndpoint**: For agent-based platforms, you may optionally configure the endpoint to reach the agent on. +**AgentEndpoint**: For agent-based platforms, you may optionally configure the endpoint to reach the agent on. Example: From b6c25517fc4bb9bb9c36a80b3c4fd6aeabf7805c Mon Sep 17 00:00:00 2001 From: Stephen-Bao Date: Tue, 2 Aug 2022 15:41:05 -0400 Subject: [PATCH 14/14] rerun integ test --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index a31e5fdf..131cb056 100644 --- a/README.md +++ b/README.md @@ -300,7 +300,7 @@ Example: AWS_EMF_ENVIRONMENT="Local" ``` -**AgentEndpoint**: For agent-based platforms, you may optionally configure the endpoint to reach the agent on. +**AgentEndpoint**: For agent-based platforms, you may optionally configure the endpoint to reach the agent on. Example: