Skip to content

Commit aaa10c6

Browse files
authored
Add support for asynchronous flushing (awslabs#82)
1 parent ab1f24a commit aaa10c6

34 files changed

+727
-91
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ build
1111
out
1212
.settings
1313
.temp
14+
bin

README.md

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
Generate CloudWatch metrics embedded within structured log events. The embedded metrics will be extracted so that you can visualize and alarm on them for real-time incident detection. This allows you to monitor aggregated values while preserving the detailed log event context that generates them.
77
- [Use Cases](#use-cases)
88
- [Usage](#usage)
9+
- [Graceful Shutdown](#graceful-shutdown)
910
- [API](#api)
1011
- [Examples](#examples)
1112
- [Development](#development)
@@ -25,7 +26,6 @@ Generate CloudWatch metrics embedded within structured log events. The embedded
2526

2627
To use a metric logger, you need to manually create and flush the logger.
2728

28-
2929
```java
3030
import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger;
3131
import software.amazon.cloudwatchlogs.emf.model.DimensionSet;
@@ -44,6 +44,30 @@ class Example {
4444

4545
You can find the artifact location and examples of how to include it in your project at [Maven Central](https://search.maven.org/artifact/software.amazon.cloudwatchlogs/aws-embedded-metrics)
4646

47+
## Graceful Shutdown
48+
49+
**Since:** 2.0.0-beta-1
50+
51+
In any environment, other than AWS Lambda, we recommend running an out-of-process agent (the CloudWatch Agent or
52+
FireLens / Fluent-Bit) to collect the EMF events. When using an out-of-process agent, this package will buffer the data
53+
asynchronously in process to handle any transient communication issues with the agent. This means that when the `MetricsLogger`
54+
gets flushed, data may not be safely persisted yet. To gracefully shutdown the environment, you can call shutdown on the
55+
environment's sink. A full example can be found in the [`examples`](examples) directory.
56+
57+
```java
58+
// create an environment singleton, this should be re-used across loggers
59+
DefaultEnvironment environment = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig());
60+
61+
MetricsLogger logger = new MetricsLogger(environment);
62+
logger.setDimensions(DimensionSet.of("Operation", "ProcessRecords"));
63+
logger.putMetric("ExampleMetric", 100, Unit.MILLISECONDS);
64+
logger.putProperty("RequestId", "422b1569-16f6-4a03-b8f0-fe3fd9b100f8");
65+
logger.flush();
66+
67+
// flush the sink, waiting up to 10s before giving up
68+
environment.getSink().shutdown().orTimeout(10_000L, TimeUnit.MILLISECONDS);
69+
```
70+
4771
## API
4872

4973
### MetricsLogger

build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ allprojects {
2828
targetCompatibility = '1.8'
2929
}
3030

31-
version = '1.0.4'
31+
version = '2.0.0-beta-1'
3232
}
3333

3434
java {
@@ -64,6 +64,7 @@ dependencies {
6464
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.11.1'
6565
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.11.1'
6666
implementation 'org.slf4j:slf4j-api:1.7.30'
67+
implementation 'org.javatuples:javatuples:1.2'
6768

6869
// Use JUnit test framework
6970
testImplementation 'software.amazon.awssdk:cloudwatch:2.13.54'

buildspecs/buildspec.canary.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ env:
99
phases:
1010
install:
1111
runtime-versions:
12-
java: corretto8
12+
java: corretto11
1313
commands:
1414
# start docker
1515
# https://docs.aws.amazon.com/codebuild/latest/userguide/sample-docker-custom-image.html#sample-docker-custom-image-files

buildspecs/buildspec.release.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ env:
1111
phases:
1212
install:
1313
runtime-versions:
14-
java: corretto8
14+
java: corretto11
1515
build:
1616
commands:
1717
- ./gradlew publish

buildspecs/buildspec.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ env:
99
phases:
1010
install:
1111
runtime-versions:
12-
java: corretto8
12+
java: corretto11
1313
commands:
1414
# start docker
1515
# https://docs.aws.amazon.com/codebuild/latest/userguide/sample-docker-custom-image.html#sample-docker-custom-image-files

examples/agent/src/main/java/agent/App.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,27 @@
11
package agent;
22

3+
import software.amazon.cloudwatchlogs.emf.config.EnvironmentConfigurationProvider;
4+
import software.amazon.cloudwatchlogs.emf.environment.DefaultEnvironment;
5+
import software.amazon.cloudwatchlogs.emf.environment.Environment;
36
import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger;
47
import software.amazon.cloudwatchlogs.emf.model.DimensionSet;
58
import software.amazon.cloudwatchlogs.emf.model.Unit;
69

10+
import java.util.concurrent.TimeUnit;
11+
712
public class App {
813

914
public static void main(String[] args) {
10-
MetricsLogger logger = new MetricsLogger();
11-
logger.putDimensions(DimensionSet.of("Operation", "Agent"));
15+
DefaultEnvironment environment = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig());
16+
emitMetric(environment);
17+
emitMetric(environment);
18+
emitMetric(environment);
19+
environment.getSink().shutdown().orTimeout(360_000L, TimeUnit.MILLISECONDS);
20+
}
21+
22+
private static void emitMetric(Environment environment) {
23+
MetricsLogger logger = new MetricsLogger(environment);
24+
logger.setDimensions(DimensionSet.of("Operation", "Agent"));
1225
logger.putMetric("ExampleMetric", 100, Unit.MILLISECONDS);
1326
logger.putProperty("RequestId", "422b1569-16f6-4a03-b8f0-fe3fd9b100f8");
1427
logger.flush();

examples/ecs-firelens/src/main/java/App.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,25 @@
1717
import com.sun.net.httpserver.HttpExchange;
1818
import com.sun.net.httpserver.HttpHandler;
1919
import com.sun.net.httpserver.HttpServer;
20+
import software.amazon.cloudwatchlogs.emf.config.EnvironmentConfigurationProvider;
21+
import software.amazon.cloudwatchlogs.emf.environment.ECSEnvironment;
22+
import software.amazon.cloudwatchlogs.emf.environment.Environment;
2023
import software.amazon.cloudwatchlogs.emf.environment.EnvironmentProvider;
2124
import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger;
2225
import software.amazon.cloudwatchlogs.emf.model.Unit;
26+
import sun.misc.Signal;
2327

2428
import java.io.IOException;
2529
import java.io.OutputStream;
2630
import java.net.InetSocketAddress;
31+
import java.util.concurrent.TimeUnit;
2732

2833
public class App {
2934

35+
private static final Environment env = new ECSEnvironment(EnvironmentConfigurationProvider.getConfig());
36+
3037
public static void main(String[] args) throws Exception {
38+
registerShutdownHook();
3139

3240
int portNumber = 8000;
3341
HttpServer server = HttpServer.create(new InetSocketAddress(8000), 0);
@@ -37,6 +45,14 @@ public static void main(String[] args) throws Exception {
3745
server.start();
3846
}
3947

48+
private static void registerShutdownHook() {
49+
// https://aws.amazon.com/blogs/containers/graceful-shutdowns-with-ecs/
50+
Signal.handle(new Signal("TERM"), sig -> {
51+
env.getSink().shutdown().orTimeout(1_000L, TimeUnit.MILLISECONDS);
52+
System.exit(0);
53+
});
54+
}
55+
4056
static class SimpleHandler implements HttpHandler {
4157
@Override
4258
public void handle(HttpExchange he) throws IOException {

src/integration-test/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerIntegrationTest.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131
import software.amazon.awssdk.services.cloudwatch.model.Statistic;
3232
import software.amazon.cloudwatchlogs.emf.config.Configuration;
3333
import software.amazon.cloudwatchlogs.emf.config.EnvironmentConfigurationProvider;
34-
import software.amazon.cloudwatchlogs.emf.environment.EnvironmentProvider;
34+
import software.amazon.cloudwatchlogs.emf.environment.DefaultEnvironment;
35+
import software.amazon.cloudwatchlogs.emf.environment.Environment;
3536
import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger;
3637
import software.amazon.cloudwatchlogs.emf.model.DimensionSet;
3738
import software.amazon.cloudwatchlogs.emf.model.Unit;
@@ -56,56 +57,64 @@ public void setUp() {
5657

5758
@Test(timeout = 120_000)
5859
public void testSingleFlushOverTCP() throws InterruptedException {
60+
Environment env = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig());
5961
String metricName = "TCP-SingleFlush";
6062
int expectedSamples = 1;
6163
config.setAgentEndpoint("tcp://127.0.0.1:25888");
6264

63-
logMetric(metricName);
65+
logMetric(env, metricName);
66+
env.getSink().shutdown().join();
6467

6568
assertTrue(retryUntilSucceed(() -> buildRequest(metricName), expectedSamples));
6669
}
6770

6871
@Test(timeout = 300_000)
6972
public void testMultipleFlushesOverTCP() throws InterruptedException {
73+
Environment env = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig());
7074
String metricName = "TCP-MultipleFlushes";
7175
int expectedSamples = 3;
7276
config.setAgentEndpoint("tcp://127.0.0.1:25888");
7377

74-
logMetric(metricName);
75-
logMetric(metricName);
78+
logMetric(env, metricName);
79+
logMetric(env, metricName);
7680
Thread.sleep(500);
77-
logMetric(metricName);
81+
logMetric(env, metricName);
82+
env.getSink().shutdown().join();
7883

7984
assertTrue(retryUntilSucceed(() -> buildRequest(metricName), expectedSamples));
8085
}
8186

8287
@Test(timeout = 120_000)
8388
public void testSingleFlushOverUDP() throws InterruptedException {
89+
Environment env = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig());
8490
String metricName = "UDP-SingleFlush";
8591
int expectedSamples = 1;
8692
config.setAgentEndpoint("udp://127.0.0.1:25888");
8793

88-
logMetric(metricName);
94+
logMetric(env, metricName);
95+
env.getSink().shutdown().join();
8996

9097
assertTrue(retryUntilSucceed(() -> buildRequest(metricName), expectedSamples));
9198
}
9299

93100
@Test(timeout = 300_000)
94101
public void testMultipleFlushOverUDP() throws InterruptedException {
102+
Environment env = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig());
95103
String metricName = "UDP-MultipleFlush";
96104
int expectedSamples = 3;
97105
config.setAgentEndpoint("udp://127.0.0.1:25888");
98106

99-
logMetric(metricName);
100-
logMetric(metricName);
107+
logMetric(env, metricName);
108+
logMetric(env, metricName);
101109
Thread.sleep(500);
102-
logMetric(metricName);
110+
logMetric(env, metricName);
111+
env.getSink().shutdown().join();
103112

104113
assertTrue(retryUntilSucceed(() -> buildRequest(metricName), expectedSamples));
105114
}
106115

107-
private void logMetric(String metricName) {
108-
MetricsLogger logger = new MetricsLogger(new EnvironmentProvider());
116+
private void logMetric(Environment env, String metricName) {
117+
MetricsLogger logger = new MetricsLogger(env);
109118
logger.putDimensions(dimensions);
110119
logger.putMetric(metricName, 100, Unit.MILLISECONDS);
111120
logger.flush();

src/main/java/software/amazon/cloudwatchlogs/emf/Constants.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,27 @@ public class Constants {
2424
public static final int MAX_METRICS_PER_EVENT = 100;
2525

2626
public static final int MAX_DATAPOINTS_PER_METRIC = 100;
27+
28+
/**
29+
* The max number of messages to hold in memory in case of transient socket errors. The maximum
30+
* message size is 256 KB meaning the maximum size of this buffer would be 25,600 MB
31+
*/
32+
public static final int DEFAULT_ASYNC_BUFFER_SIZE = 100;
33+
34+
/**
35+
* How many times to retry an individual message. We eventually give up vs. retrying
36+
* indefinitely in case there is something inherent to the message that is causing the failures.
37+
* Giving up results in data loss, but also helps us reduce the risk of a poison pill blocking
38+
* all process telemetry.
39+
*/
40+
public static final int MAX_ATTEMPTS_PER_MESSAGE = 100;
41+
42+
/** Starting backoff millis when a transient socket failure is encountered. */
43+
public static final int MIN_BACKOFF_MILLIS = 50;
44+
45+
/** Max backoff millis when a transient socket failure is encountered. */
46+
public static final int MAX_BACKOFF_MILLIS = 2000;
47+
48+
/** Maximum amount of random jitter to apply to retries */
49+
public static final int MAX_BACKOFF_JITTER = 20;
2750
}

src/main/java/software/amazon/cloudwatchlogs/emf/config/Configuration.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
import java.util.Optional;
2020
import lombok.AllArgsConstructor;
21+
import lombok.Getter;
2122
import lombok.NoArgsConstructor;
2223
import lombok.Setter;
24+
import software.amazon.cloudwatchlogs.emf.Constants;
2325
import software.amazon.cloudwatchlogs.emf.environment.Environments;
2426
import software.amazon.cloudwatchlogs.emf.util.StringUtils;
2527

@@ -54,6 +56,9 @@ public class Configuration {
5456
*/
5557
@Setter Environments environmentOverride;
5658

59+
/** Queue length for asynchronous sinks. */
60+
@Setter @Getter int asyncBufferSize = Constants.DEFAULT_ASYNC_BUFFER_SIZE;
61+
5762
public Optional<String> getServiceName() {
5863
return getStringOptional(serviceName);
5964
}
@@ -85,6 +90,6 @@ private Optional<String> getStringOptional(String value) {
8590
if (StringUtils.isNullOrEmpty(value)) {
8691
return Optional.empty();
8792
}
88-
return Optional.ofNullable(value);
93+
return Optional.of(value);
8994
}
9095
}

src/main/java/software/amazon/cloudwatchlogs/emf/config/ConfigurationKeys.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,5 @@ public class ConfigurationKeys {
2727
public static final String LOG_STREAM_NAME = "LOG_STREAM_NAME";
2828
public static final String AGENT_ENDPOINT = "AGENT_ENDPOINT";
2929
public static final String ENVIRONMENT_OVERRIDE = "ENVIRONMENT";
30+
public static final String ASYNC_BUFFER_SIZE = "ASYNC_BUFFER_SIZE";
3031
}

src/main/java/software/amazon/cloudwatchlogs/emf/config/EnvironmentConfigurationProvider.java

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package software.amazon.cloudwatchlogs.emf.config;
1818

19+
import software.amazon.cloudwatchlogs.emf.Constants;
1920
import software.amazon.cloudwatchlogs.emf.environment.Environments;
2021
import software.amazon.cloudwatchlogs.emf.util.StringUtils;
2122

@@ -27,21 +28,21 @@ protected EnvironmentConfigurationProvider() {}
2728

2829
public static Configuration getConfig() {
2930
if (config == null) {
30-
config =
31-
new Configuration(
32-
getEnvVar(ConfigurationKeys.SERVICE_NAME),
33-
getEnvVar(ConfigurationKeys.SERVICE_TYPE),
34-
getEnvVar(ConfigurationKeys.LOG_GROUP_NAME),
35-
getEnvVar(ConfigurationKeys.LOG_STREAM_NAME),
36-
getEnvVar(ConfigurationKeys.AGENT_ENDPOINT),
37-
getEnvironmentOverride());
31+
config = createConfig();
3832
}
3933
return config;
4034
}
4135

42-
private static String getEnvVar(String key) {
43-
String name = String.join("", ConfigurationKeys.ENV_VAR_PREFIX, "_", key);
44-
return getEnv(name);
36+
static Configuration createConfig() {
37+
return new Configuration(
38+
getEnvVar(ConfigurationKeys.SERVICE_NAME),
39+
getEnvVar(ConfigurationKeys.SERVICE_TYPE),
40+
getEnvVar(ConfigurationKeys.LOG_GROUP_NAME),
41+
getEnvVar(ConfigurationKeys.LOG_STREAM_NAME),
42+
getEnvVar(ConfigurationKeys.AGENT_ENDPOINT),
43+
getEnvironmentOverride(),
44+
getIntOrDefault(
45+
ConfigurationKeys.ASYNC_BUFFER_SIZE, Constants.DEFAULT_ASYNC_BUFFER_SIZE));
4546
}
4647

4748
private static Environments getEnvironmentOverride() {
@@ -57,6 +58,24 @@ private static Environments getEnvironmentOverride() {
5758
}
5859
}
5960

61+
private static int getIntOrDefault(String key, int defaultValue) {
62+
String value = getEnvVar(key);
63+
if (StringUtils.isNullOrEmpty(value)) {
64+
return defaultValue;
65+
}
66+
67+
try {
68+
return Integer.parseInt(value);
69+
} catch (NumberFormatException e) {
70+
return defaultValue;
71+
}
72+
}
73+
74+
private static String getEnvVar(String key) {
75+
String name = String.join("", ConfigurationKeys.ENV_VAR_PREFIX, "_", key);
76+
return getEnv(name);
77+
}
78+
6079
private static String getEnv(String name) {
6180
return SystemWrapper.getenv(name);
6281
}

0 commit comments

Comments
 (0)