Skip to content

Commit 49d745e

Browse files
committed
Add functional and unit tests to verify async core metrics for event streaming operation
1 parent 1f53f9c commit 49d745e

File tree

4 files changed

+171
-19
lines changed

4 files changed

+171
-19
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/util/MetricUtils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package software.amazon.awssdk.core.internal.util;
1717

1818
import static software.amazon.awssdk.core.client.config.SdkClientOption.METRIC_PUBLISHER;
19+
import static software.amazon.awssdk.core.http.HttpResponseHandler.X_AMZN_REQUEST_ID_HEADER;
1920

2021
import java.time.Duration;
2122
import java.util.Optional;
@@ -106,6 +107,8 @@ public static void collectHttpMetrics(MetricCollector metricCollector, SdkHttpFu
106107
metricCollector.reportMetric(CoreMetric.HTTP_STATUS_CODE, httpResponse.statusCode());
107108
httpResponse.firstMatchingHeader("x-amz-request-id")
108109
.ifPresent(v -> metricCollector.reportMetric(CoreMetric.AWS_REQUEST_ID, v));
110+
httpResponse.firstMatchingHeader(X_AMZN_REQUEST_ID_HEADER)
111+
.ifPresent(v -> metricCollector.reportMetric(CoreMetric.AWS_REQUEST_ID, v));
109112
httpResponse.firstMatchingHeader("x-amz-id-2")
110113
.ifPresent(v -> metricCollector.reportMetric(CoreMetric.AWS_EXTENDED_REQUEST_ID, v));
111114
}

services/transcribestreaming/src/it/java/software/amazon/awssdk/services/transcribestreaming/TranscribeStreamingIntegrationTest.java

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,21 @@
1616

1717
import static org.assertj.core.api.Assertions.assertThat;
1818
import static org.junit.Assert.assertTrue;
19+
import static org.mockito.Mockito.mock;
20+
import static org.mockito.Mockito.verify;
1921
import static software.amazon.awssdk.http.Header.CONTENT_TYPE;
2022

2123
import java.io.File;
2224
import java.io.FileInputStream;
2325
import java.io.FileNotFoundException;
2426
import java.io.InputStream;
25-
import java.net.URISyntaxException;
27+
import java.time.Duration;
2628
import java.util.List;
2729
import java.util.concurrent.CompletableFuture;
2830
import java.util.concurrent.ExecutionException;
2931
import org.junit.BeforeClass;
3032
import org.junit.Test;
33+
import org.mockito.ArgumentCaptor;
3134
import org.reactivestreams.Publisher;
3235
import org.reactivestreams.Subscriber;
3336
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -36,6 +39,7 @@
3639
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
3740
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
3841
import software.amazon.awssdk.core.internal.util.Mimetype;
42+
import software.amazon.awssdk.core.metrics.CoreMetric;
3943
import software.amazon.awssdk.metrics.MetricCollection;
4044
import software.amazon.awssdk.metrics.MetricPublisher;
4145
import software.amazon.awssdk.regions.Region;
@@ -44,6 +48,7 @@
4448
import software.amazon.awssdk.services.transcribestreaming.model.MediaEncoding;
4549
import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionRequest;
4650
import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionResponseHandler;
51+
import software.amazon.awssdk.utils.Logger;
4752

4853
/**
4954
* An example test class to show the usage of
@@ -53,43 +58,36 @@
5358
* The audio files used in this class don't have voice, so there won't be any transcripted text would be empty
5459
*/
5560
public class TranscribeStreamingIntegrationTest {
61+
private static final Logger log = Logger.loggerFor(TranscribeStreamingIntegrationTest.class);
5662

5763
private static TranscribeStreamingAsyncClient client;
5864

65+
private static MetricPublisher mockPublisher;
66+
5967
@BeforeClass
60-
public static void setup() throws URISyntaxException {
68+
public static void setup() {
69+
mockPublisher = mock(MetricPublisher.class);
6170
client = TranscribeStreamingAsyncClient.builder()
6271
.region(Region.US_EAST_1)
6372
.overrideConfiguration(b -> b.addExecutionInterceptor(new VerifyHeaderInterceptor())
64-
.metricPublisher(new MetricPublisher() {
65-
@Override
66-
public void publish(MetricCollection metricCollection) {
67-
System.out.println(metricCollection);
68-
}
69-
70-
@Override
71-
public void close() {
72-
73-
}
74-
})
75-
)
73+
.metricPublisher(mockPublisher))
7674
.credentialsProvider(getCredentials())
7775
.build();
7876
}
7977

8078
@Test
81-
public void testFileWith16kRate() throws ExecutionException, InterruptedException, URISyntaxException {
79+
public void testFileWith16kRate() throws InterruptedException {
8280
CompletableFuture<Void> result = client.startStreamTranscription(getRequest(16_000),
8381
new AudioStreamPublisher(
8482
getInputStream("silence_16kHz_s16le.wav")),
8583
TestResponseHandlers.responseHandlerBuilder_Classic());
8684

87-
// Blocking call to keep the main thread for shutting down
88-
result.get();
85+
result.join();
86+
verifyMetrics();
8987
}
9088

9189
@Test
92-
public void testFileWith8kRate() throws ExecutionException, InterruptedException, URISyntaxException {
90+
public void testFileWith8kRate() throws ExecutionException, InterruptedException {
9391
CompletableFuture<Void> result = client.startStreamTranscription(getRequest(8_000),
9492
new AudioStreamPublisher(
9593
getInputStream("silence_8kHz_s16le.wav")),
@@ -143,4 +141,34 @@ public void beforeTransmission(Context.BeforeTransmission context, ExecutionAttr
143141
assertThat(contentTypeHeader.get(0)).isEqualTo(Mimetype.MIMETYPE_EVENT_STREAM);
144142
}
145143
}
144+
145+
private void verifyMetrics() throws InterruptedException {
146+
// wait for 100ms for metrics to be delivered to mockPublisher
147+
Thread.sleep(100);
148+
ArgumentCaptor<MetricCollection> collectionCaptor = ArgumentCaptor.forClass(MetricCollection.class);
149+
verify(mockPublisher).publish(collectionCaptor.capture());
150+
MetricCollection capturedCollection = collectionCaptor.getValue();
151+
assertThat(capturedCollection.name()).isEqualTo("ApiCall");
152+
log.info(() -> "captured collection: " + capturedCollection);
153+
154+
assertThat(capturedCollection.metricValues(CoreMetric.CREDENTIALS_FETCH_DURATION).get(0))
155+
.isGreaterThanOrEqualTo(Duration.ZERO);
156+
assertThat(capturedCollection.metricValues(CoreMetric.MARSHALLING_DURATION).get(0))
157+
.isGreaterThanOrEqualTo(Duration.ZERO);
158+
assertThat(capturedCollection.metricValues(CoreMetric.API_CALL_DURATION).get(0))
159+
.isGreaterThan(Duration.ZERO);
160+
161+
MetricCollection attemptCollection = capturedCollection.children().get(0);
162+
assertThat(attemptCollection.name()).isEqualTo("ApiCallAttempt");
163+
assertThat(attemptCollection.children()).isEmpty();
164+
assertThat(attemptCollection.metricValues(CoreMetric.HTTP_STATUS_CODE))
165+
.containsExactly(200);
166+
assertThat(attemptCollection.metricValues(CoreMetric.SIGNING_DURATION).get(0))
167+
.isGreaterThanOrEqualTo(Duration.ZERO);
168+
assertThat(attemptCollection.metricValues(CoreMetric.AWS_REQUEST_ID).get(0)).isNotEmpty();
169+
170+
assertThat(attemptCollection.metricValues(CoreMetric.HTTP_REQUEST_ROUND_TRIP_TIME).get(0))
171+
.isGreaterThanOrEqualTo(Duration.ofMillis(100));
172+
}
173+
146174
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.metrics.async;
17+
18+
import static org.mockito.Mockito.when;
19+
20+
import com.github.tomakehurst.wiremock.junit.WireMockRule;
21+
import java.net.URI;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.function.Supplier;
24+
import org.junit.After;
25+
import org.junit.Before;
26+
import org.junit.Rule;
27+
import org.junit.runner.RunWith;
28+
import org.mockito.Mock;
29+
import org.mockito.runners.MockitoJUnitRunner;
30+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
31+
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
32+
import software.amazon.awssdk.core.async.EmptyPublisher;
33+
import software.amazon.awssdk.core.signer.NoOpSigner;
34+
import software.amazon.awssdk.metrics.MetricPublisher;
35+
import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonAsyncClient;
36+
import software.amazon.awssdk.services.protocolrestjson.model.EventStreamOperationRequest;
37+
import software.amazon.awssdk.services.protocolrestjson.model.EventStreamOperationResponseHandler;
38+
39+
/**
40+
* Core metrics test for async streaming API
41+
*/
42+
@RunWith(MockitoJUnitRunner.class)
43+
public class AsyncEventStreamingCoreMetricsTest extends BaseAsyncCoreMetricsTest {
44+
@Rule
45+
public WireMockRule wireMock = new WireMockRule(0);
46+
47+
@Mock
48+
private AwsCredentialsProvider mockCredentialsProvider;
49+
50+
@Mock
51+
private MetricPublisher mockPublisher;
52+
53+
54+
private ProtocolRestJsonAsyncClient client;
55+
56+
@Before
57+
public void setup() {
58+
client = ProtocolRestJsonAsyncClient.builder()
59+
.credentialsProvider(mockCredentialsProvider)
60+
.endpointOverride(URI.create("http://localhost:" + wireMock.port()))
61+
.overrideConfiguration(c -> c.metricPublisher(mockPublisher)
62+
.retryPolicy(b -> b.numRetries(MAX_RETRIES)))
63+
.build();
64+
65+
when(mockCredentialsProvider.resolveCredentials()).thenAnswer(invocation -> {
66+
try {
67+
Thread.sleep(100);
68+
} catch (InterruptedException ie) {
69+
ie.printStackTrace();
70+
}
71+
return AwsBasicCredentials.create("foo", "bar");
72+
});
73+
}
74+
75+
@After
76+
public void teardown() {
77+
wireMock.resetAll();
78+
if (client != null) {
79+
client.close();
80+
}
81+
client = null;
82+
}
83+
84+
@Override
85+
String operationName() {
86+
return "EventStreamOperation";
87+
}
88+
89+
@Override
90+
Supplier<CompletableFuture<?>> callable() {
91+
return () -> client.eventStreamOperation(EventStreamOperationRequest.builder().overrideConfiguration(b -> b.signer(new NoOpSigner())).build(),
92+
new EmptyPublisher<>(),
93+
EventStreamOperationResponseHandler.builder()
94+
.subscriber(b -> {})
95+
.build());
96+
}
97+
98+
@Override
99+
MetricPublisher publisher() {
100+
return mockPublisher;
101+
}
102+
103+
void addDelayIfNeeded() {
104+
try {
105+
Thread.sleep(100);
106+
} catch (InterruptedException e) {
107+
e.printStackTrace();
108+
}
109+
}
110+
}

test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/metrics/async/BaseAsyncCoreMetricsTest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ public abstract class BaseAsyncCoreMetricsTest {
5050
@Test
5151
public void apiCall_operationSuccessful_addsMetrics() {
5252
stubSuccessfulResponse();
53-
5453
callable().get().join();
54+
addDelayIfNeeded();
5555

5656
ArgumentCaptor<MetricCollection> collectionCaptor = ArgumentCaptor.forClass(MetricCollection.class);
5757
verify(publisher()).publish(collectionCaptor.capture());
@@ -74,6 +74,7 @@ public void apiCall_operationSuccessful_addsMetrics() {
7474
public void apiCall_allRetryAttemptsFailedOf500() {
7575
stubErrorResponse();
7676
assertThatThrownBy(() -> callable().get().join()).hasCauseInstanceOf(EmptyModeledException.class);
77+
addDelayIfNeeded();
7778

7879
ArgumentCaptor<MetricCollection> collectionCaptor = ArgumentCaptor.forClass(MetricCollection.class);
7980
verify(publisher()).publish(collectionCaptor.capture());
@@ -89,6 +90,7 @@ public void apiCall_allRetryAttemptsFailedOf500() {
8990
public void apiCall_allRetryAttemptsFailedOfNetworkError() {
9091
stubNetworkError();
9192
assertThatThrownBy(() -> callable().get().join()).hasCauseInstanceOf(SdkClientException.class);
93+
addDelayIfNeeded();
9294

9395
ArgumentCaptor<MetricCollection> collectionCaptor = ArgumentCaptor.forClass(MetricCollection.class);
9496
verify(publisher()).publish(collectionCaptor.capture());
@@ -115,6 +117,7 @@ public void apiCall_allRetryAttemptsFailedOfNetworkError() {
115117
public void apiCall_firstAttemptFailedRetrySucceeded() {
116118
stubSuccessfulRetry();
117119
callable().get().join();
120+
addDelayIfNeeded();
118121

119122
ArgumentCaptor<MetricCollection> collectionCaptor = ArgumentCaptor.forClass(MetricCollection.class);
120123
verify(publisher()).publish(collectionCaptor.capture());
@@ -130,6 +133,14 @@ public void apiCall_firstAttemptFailedRetrySucceeded() {
130133
verifySuccessfulApiCallAttemptCollection(successfulAttempt);
131134
}
132135

136+
/**
137+
* Adds delay after calling CompletableFuture.join to wait for publisher to get metrics.
138+
* no op by default, can be overridden by subclasses
139+
*/
140+
void addDelayIfNeeded() {
141+
// no op by default
142+
}
143+
133144
abstract String operationName();
134145

135146
abstract Supplier<CompletableFuture<?>> callable();

0 commit comments

Comments
 (0)