Skip to content

Allow STDOUT overrides for AgentBasedEnvironments #147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,24 @@ config.setAgentEndpoint("udp://127.0.0.1:1000");
AWS_EMF_AGENT_ENDPOINT="udp://127.0.0.1:1000"
```

**WriteToStdout**: For agent-based platforms, setting this configuration to `true` will make the `MetricsLogger` write to `stdout` rather than sending them to the agent. The default value for this configuration is `false`. This configuration has no effect for non-agent-based platforms.

If an `EnvironmentOverride` is provided, this configuration will apply to the overriden environment if the environment is an agent-based platform

Example:

```java
// in process
import software.amazon.cloudwatchlogs.emf.config.Configuration;
import software.amazon.cloudwatchlogs.emf.config.EnvironmentConfigurationProvider;

Configuration config = EnvironmentConfigurationProvider.getConfig();
config.setShouldWriteToStdout(true);

// environment
AWS_EMF_WRITE_TO_STDOUT="true"
```

## Thread-safety

### Internal Synchronization
Expand Down
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ With Docker images, using the `awslogs` log driver will send your container logs
## ECS and Fargate

With ECS and Fargate, you can use the `awsfirelens` (recommended) or `awslogs` log driver to have your logs sent to CloudWatch Logs on your behalf. After configuring the options for your preferred log driver, you may write your EMF logs to STDOUT and they will be processed.
To write your EMF logs to STDOUT, set the environment variable `AWS_EMF_WRITE_TO_STDOUT=true`

[`awsfirelens` documentation](https://github.com/aws/amazon-cloudwatch-logs-for-fluent-bit)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class Configuration {
/** Queue length for asynchronous sinks. */
@Setter @Getter int asyncBufferSize = Constants.DEFAULT_ASYNC_BUFFER_SIZE;

@Setter private boolean shouldWriteToStdout;

public Optional<String> getServiceName() {
return getStringOptional(serviceName);
}
Expand Down Expand Up @@ -92,4 +94,8 @@ private Optional<String> getStringOptional(String value) {
}
return Optional.of(value);
}

public boolean shouldWriteToStdout() {
return shouldWriteToStdout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ public class ConfigurationKeys {
public static final String AGENT_ENDPOINT = "AGENT_ENDPOINT";
public static final String ENVIRONMENT_OVERRIDE = "ENVIRONMENT";
public static final String ASYNC_BUFFER_SIZE = "ASYNC_BUFFER_SIZE";
public static final String WRITE_TO_STDOUT = "WRITE_TO_STDOUT";
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ static Configuration createConfig() {
getEnvVar(ConfigurationKeys.AGENT_ENDPOINT),
getEnvironmentOverride(),
getIntOrDefault(
ConfigurationKeys.ASYNC_BUFFER_SIZE, Constants.DEFAULT_ASYNC_BUFFER_SIZE));
ConfigurationKeys.ASYNC_BUFFER_SIZE, Constants.DEFAULT_ASYNC_BUFFER_SIZE),
Boolean.parseBoolean(getEnvVar(ConfigurationKeys.WRITE_TO_STDOUT)));
}

private static Environments getEnvironmentOverride() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import software.amazon.cloudwatchlogs.emf.Constants;
import software.amazon.cloudwatchlogs.emf.config.Configuration;
import software.amazon.cloudwatchlogs.emf.sinks.AgentSink;
import software.amazon.cloudwatchlogs.emf.sinks.ConsoleSink;
import software.amazon.cloudwatchlogs.emf.sinks.Endpoint;
import software.amazon.cloudwatchlogs.emf.sinks.ISink;
import software.amazon.cloudwatchlogs.emf.sinks.SocketClientFactory;
Expand Down Expand Up @@ -67,27 +68,31 @@ public String getLogStreamName() {
@Override
public ISink getSink() {
if (sink == null) {
Endpoint endpoint;
if (config.getAgentEndpoint().isPresent()) {
endpoint = Endpoint.fromURL(config.getAgentEndpoint().get());
if (config.shouldWriteToStdout()) {
sink = new ConsoleSink();
} else {
log.info(
"Endpoint is not defined. Using default: {}",
Endpoint.DEFAULT_TCP_ENDPOINT);
endpoint = Endpoint.DEFAULT_TCP_ENDPOINT;
Endpoint endpoint;
if (config.getAgentEndpoint().isPresent()) {
endpoint = Endpoint.fromURL(config.getAgentEndpoint().get());
} else {
log.info(
"Endpoint is not defined. Using default: {}",
Endpoint.DEFAULT_TCP_ENDPOINT);
endpoint = Endpoint.DEFAULT_TCP_ENDPOINT;
}
sink =
new AgentSink(
getLogGroupName(),
getLogStreamName(),
endpoint,
new SocketClientFactory(),
config.getAsyncBufferSize(),
() ->
new FibonacciRetryStrategy(
Constants.MIN_BACKOFF_MILLIS,
Constants.MAX_BACKOFF_MILLIS,
Constants.MAX_BACKOFF_JITTER));
}
sink =
new AgentSink(
getLogGroupName(),
getLogStreamName(),
endpoint,
new SocketClientFactory(),
config.getAsyncBufferSize(),
() ->
new FibonacciRetryStrategy(
Constants.MIN_BACKOFF_MILLIS,
Constants.MAX_BACKOFF_MILLIS,
Constants.MAX_BACKOFF_JITTER));
}
return sink;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public void getGetConfig() {
putEnv("AWS_EMF_AGENT_ENDPOINT", "Endpoint");
putEnv("AWS_EMF_ENVIRONMENT", "Agent");
putEnv("AWS_EMF_ASYNC_BUFFER_SIZE", "9999");
putEnv("AWS_EMF_WRITE_TO_STDOUT", "true");

Configuration config = EnvironmentConfigurationProvider.createConfig();

Expand All @@ -50,6 +51,7 @@ public void getGetConfig() {
assertEquals("Endpoint", config.getAgentEndpoint().get());
assertEquals(Environments.Agent, config.getEnvironmentOverride());
assertEquals(9999, config.getAsyncBufferSize());
assertTrue(config.shouldWriteToStdout());
}

@Test
Expand All @@ -59,10 +61,20 @@ public void invalidEnvironmentValuesFallbackToExpectedDefaults() {

// act
putEnv("AWS_EMF_ASYNC_BUFFER_SIZE", "NaN");
putEnv("AWS_EMF_WRITE_TO_STDOUT", "notABool");

// assert
Configuration config = EnvironmentConfigurationProvider.createConfig();
assertEquals(100, config.getAsyncBufferSize());
assertFalse(config.shouldWriteToStdout());
}

@Test
public void emptyEnvironmentValuesFallbackToExpectedDefaults() {
// assert
Configuration config = EnvironmentConfigurationProvider.createConfig();
assertEquals(100, config.getAsyncBufferSize());
assertFalse(config.shouldWriteToStdout());
}

private void putEnv(String key, String value) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package software.amazon.cloudwatchlogs.emf.environment;

import static org.junit.Assert.assertEquals;
import static org.powermock.api.mockito.PowerMockito.mock;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import software.amazon.cloudwatchlogs.emf.config.Configuration;
import software.amazon.cloudwatchlogs.emf.config.SystemWrapper;
import software.amazon.cloudwatchlogs.emf.model.MetricsContext;
import software.amazon.cloudwatchlogs.emf.sinks.AgentSink;
import software.amazon.cloudwatchlogs.emf.sinks.ConsoleSink;
import software.amazon.cloudwatchlogs.emf.sinks.Endpoint;
import software.amazon.cloudwatchlogs.emf.sinks.ISink;

@RunWith(PowerMockRunner.class)
@PrepareForTest({SystemWrapper.class, AgentBasedEnvironment.class})
public class AgentBasedEnvironmentTest {
public static class AgentBasedEnvironmentTestImplementation extends AgentBasedEnvironment {
protected AgentBasedEnvironmentTestImplementation(Configuration config) {
super(config);
}

@Override
public boolean probe() {
return false;
}

@Override
public String getType() {
return null;
}

@Override
public void configureContext(MetricsContext context) {}
}

private Configuration configuration;

@Before
public void setup() {
this.configuration = new Configuration();
}

@Test
public void testGetSinkWithDefaultEndpoint() throws Exception {
AgentSink mockedSink = mock(AgentSink.class);
PowerMockito.whenNew(AgentSink.class)
.withAnyArguments()
.then(
invocation -> {
Endpoint endpoint = invocation.getArgument(2);
assertEquals(Endpoint.DEFAULT_TCP_ENDPOINT, endpoint);
return mockedSink;
});

AgentBasedEnvironment env = new AgentBasedEnvironmentTestImplementation(configuration);
ISink sink = env.getSink();

assertEquals(mockedSink, sink);
}

@Test
public void testGetSinkWithConfiguredEndpoint() throws Exception {
String endpointUrl = "http://configured-endpoint:1234";
configuration.setAgentEndpoint(endpointUrl);
AgentSink mockedSink = mock(AgentSink.class);
PowerMockito.whenNew(AgentSink.class)
.withAnyArguments()
.then(
invocation -> {
Endpoint endpoint = invocation.getArgument(2);
assertEquals(Endpoint.fromURL(endpointUrl), endpoint);
return mockedSink;
});

AgentBasedEnvironment env = new AgentBasedEnvironmentTestImplementation(configuration);
ISink sink = env.getSink();

assertEquals(mockedSink, sink);
}

@Test
public void testGetSinkOverrideToStdOut() {
configuration.setShouldWriteToStdout(true);

AgentBasedEnvironment env = new AgentBasedEnvironmentTestImplementation(configuration);
ISink sink = env.getSink();

assertEquals(ConsoleSink.class, sink.getClass());
}

@Test
public void testGetSinkOverrideToStdOutFailFastOnImproperOverride() throws Exception {
configuration.setShouldWriteToStdout(false);

testGetSinkWithDefaultEndpoint();
}
}