Skip to content

Commit af41a53

Browse files
authored
feat: support bi-directional eventstream over H2 (#1082)
* Add AddHttp2Dependency.java to add Http/2 request handler to TranscribeStreaming client. Currently only NodeJs requires special request handler for Http/2. * Add AddEventStreamHandlingDependency.java to support signing event stream in requests with SigV4 streaming spec * Add eventstream-handler-node and middleware-eventstream for JS implementation singing eventstream requests and handling eventstream flow control during retry. * Create a runtime-agnostic, async iterable based eventstream serialization and deserialization implementation in package eventstream-serde-universal. Nodejs-specific package eventstream-serde-node is a wrapper over eventstream-serde-universal with Nodejs stream interface.
1 parent 8eff087 commit af41a53

File tree

79 files changed

+3722
-1024
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+3722
-1024
lines changed

Diff for: clients/client-transcribe-streaming/.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ package-lock.json
1111

1212
*.d.ts
1313
*.js
14+
!jest*.config.js
1415
*.js.map

Diff for: clients/client-transcribe-streaming/.npmignore

+2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
/coverage/
22
/docs/
3+
/test/
34
tsconfig.test.json
45
*.tsbuildinfo
6+
jest.config.js

Diff for: clients/client-transcribe-streaming/TranscribeStreamingClient.ts

+17-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ import {
1717
resolveEventStreamSerdeConfig
1818
} from "@aws-sdk/eventstream-serde-config-resolver";
1919
import { getContentLengthPlugin } from "@aws-sdk/middleware-content-length";
20+
import {
21+
EventStreamInputConfig,
22+
EventStreamResolvedConfig,
23+
resolveEventStreamConfig
24+
} from "@aws-sdk/middleware-eventstream";
2025
import {
2126
HostHeaderInputConfig,
2227
HostHeaderResolvedConfig,
@@ -52,6 +57,7 @@ import {
5257
Credentials as __Credentials,
5358
Decoder as __Decoder,
5459
Encoder as __Encoder,
60+
EventStreamPayloadHandlerProvider as __EventStreamPayloadHandlerProvider,
5561
EventStreamSerdeProvider as __EventStreamSerdeProvider,
5662
HashConstructor as __HashConstructor,
5763
HttpHandlerOptions as __HttpHandlerOptions,
@@ -148,6 +154,11 @@ export interface ClientDefaults
148154
*/
149155
regionInfoProvider?: RegionInfoProvider;
150156

157+
/**
158+
* The function that provides necessary utilities for handling request event stream.
159+
*/
160+
eventStreamPayloadHandlerProvider?: __EventStreamPayloadHandlerProvider;
161+
151162
/**
152163
* The function that provides necessary utilities for generating and parsing event stream
153164
*/
@@ -164,6 +175,7 @@ export type TranscribeStreamingClientConfig = Partial<
164175
RetryInputConfig &
165176
UserAgentInputConfig &
166177
HostHeaderInputConfig &
178+
EventStreamInputConfig &
167179
EventStreamSerdeInputConfig;
168180

169181
export type TranscribeStreamingClientResolvedConfig = __SmithyResolvedConfiguration<
@@ -176,6 +188,7 @@ export type TranscribeStreamingClientResolvedConfig = __SmithyResolvedConfigurat
176188
RetryResolvedConfig &
177189
UserAgentResolvedConfig &
178190
HostHeaderResolvedConfig &
191+
EventStreamResolvedConfig &
179192
EventStreamSerdeResolvedConfig;
180193

181194
/**
@@ -200,9 +213,10 @@ export class TranscribeStreamingClient extends __Client<
200213
let _config_4 = resolveRetryConfig(_config_3);
201214
let _config_5 = resolveUserAgentConfig(_config_4);
202215
let _config_6 = resolveHostHeaderConfig(_config_5);
203-
let _config_7 = resolveEventStreamSerdeConfig(_config_6);
204-
super(_config_7);
205-
this.config = _config_7;
216+
let _config_7 = resolveEventStreamConfig(_config_6);
217+
let _config_8 = resolveEventStreamSerdeConfig(_config_7);
218+
super(_config_8);
219+
this.config = _config_8;
206220
this.middlewareStack.use(getAwsAuthPlugin(this.config));
207221
this.middlewareStack.use(getRetryPlugin(this.config));
208222
this.middlewareStack.use(getUserAgentPlugin(this.config));

Diff for: clients/client-transcribe-streaming/commands/StartStreamTranscriptionCommand.ts

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
deserializeAws_restJson1_1StartStreamTranscriptionCommand,
1212
serializeAws_restJson1_1StartStreamTranscriptionCommand
1313
} from "../protocols/Aws_restJson1_1";
14+
import { getEventStreamPlugin } from "@aws-sdk/middleware-eventstream";
1415
import { getSerdePlugin } from "@aws-sdk/middleware-serde";
1516
import {
1617
HttpRequest as __HttpRequest,
@@ -57,6 +58,7 @@ export class StartStreamTranscriptionCommand extends $Command<
5758
this.middlewareStack.use(
5859
getSerdePlugin(configuration, this.serialize, this.deserialize)
5960
);
61+
this.middlewareStack.use(getEventStreamPlugin(configuration));
6062

6163
const stack = clientStack.concat(this.middlewareStack);
6264

Diff for: clients/client-transcribe-streaming/jest.config.js

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
const base = require("../../jest.config.base.js");
2+
3+
module.exports = {
4+
...base,
5+
// Only test cjs dist, avoid testing the package twice
6+
testPathIgnorePatterns: ["/node_modules/", "/es/", ".*.integ.spec.js"],
7+
coveragePathIgnorePatterns: [
8+
"/node_modules/",
9+
"/commands/",
10+
"/protocols/", // protocols tested in protocol protocol_tests folder
11+
"endpoints" // endpoint tested in tests/functional/endpoints
12+
]
13+
};
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
const base = require("../../jest.config.base.js");
2+
3+
module.exports = {
4+
...base,
5+
testMatch: ["**/*.integ.spec.js"]
6+
};

Diff for: clients/client-transcribe-streaming/package.json

+9-4
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,18 @@
66
"clean": "npm run remove-definitions && npm run remove-dist && npm run remove-js && npm run remove-maps",
77
"build-documentation": "npm run clean && typedoc ./",
88
"prepublishOnly": "yarn build",
9-
"pretest": "tsc",
9+
"pretest": "yarn build",
1010
"remove-definitions": "rimraf ./types",
1111
"remove-dist": "rimraf ./dist",
1212
"remove-documentation": "rimraf ./docs",
1313
"remove-js": "rimraf *.js && rimraf ./commands/*.js && rimraf ./models/*.js && rimraf ./protocols/*.js",
1414
"remove-maps": "rimraf *.js.map && rimraf ./commands/*.js.map && rimraf ./models/*.js.map && rimraf ./protocols/*.js.map",
15-
"test": "exit 0",
15+
"test": "jest --coverage --passWithNoTests",
16+
"test:integration": "jest --config jest.integ.config.js",
17+
"build:cjs": "tsc",
1618
"build:es": "tsc -p tsconfig.es.json",
17-
"build": "yarn pretest && yarn build:es"
19+
"build": "yarn build:cjs && yarn build:es",
20+
"postbuild": "cp test/speech.wav dist/cjs/test"
1821
},
1922
"main": "./dist/cjs/index.js",
2023
"types": "./types/index.d.ts",
@@ -31,13 +34,15 @@
3134
"@aws-crypto/sha256-js": "^1.0.0-alpha.0",
3235
"@aws-sdk/config-resolver": "1.0.0-gamma.1",
3336
"@aws-sdk/credential-provider-node": "1.0.0-gamma.1",
37+
"@aws-sdk/eventstream-handler-node": "1.0.0-gamma.0",
3438
"@aws-sdk/eventstream-serde-browser": "1.0.0-gamma.1",
3539
"@aws-sdk/eventstream-serde-config-resolver": "1.0.0-gamma.1",
3640
"@aws-sdk/eventstream-serde-node": "1.0.0-gamma.1",
3741
"@aws-sdk/fetch-http-handler": "1.0.0-gamma.2",
3842
"@aws-sdk/hash-node": "1.0.0-gamma.1",
3943
"@aws-sdk/invalid-dependency": "1.0.0-gamma.1",
4044
"@aws-sdk/middleware-content-length": "1.0.0-gamma.1",
45+
"@aws-sdk/middleware-eventstream": "1.0.0-gamma.0",
4146
"@aws-sdk/middleware-host-header": "1.0.0-gamma.1",
4247
"@aws-sdk/middleware-retry": "1.0.0-gamma.1",
4348
"@aws-sdk/middleware-serde": "1.0.0-gamma.1",
@@ -77,4 +82,4 @@
7782
"url": "https://aws.amazon.com/javascript/"
7883
},
7984
"license": "Apache-2.0"
80-
}
85+
}

Diff for: clients/client-transcribe-streaming/protocols/Aws_restJson1_1.ts

-4
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,6 @@ export const serializeAws_restJson1_1StartStreamTranscriptionCommand = async (
5959
serializeAws_restJson1_1AudioStream_event(event, context)
6060
);
6161
}
62-
if (body === undefined) {
63-
body = {};
64-
}
65-
body = JSON.stringify(body);
6662
const { hostname, protocol = "https", port } = await context.endpoint();
6763
return new __HttpRequest({
6864
protocol,

Diff for: clients/client-transcribe-streaming/runtimeConfig.browser.ts

+3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ export const ClientDefaultValues: Required<ClientDefaults> = {
1919
bodyLengthChecker: calculateBodyLength,
2020
credentialDefaultProvider: invalidFunction("Credential is missing") as any,
2121
defaultUserAgent: defaultUserAgent(name, version),
22+
eventStreamPayloadHandlerProvider: () => ({
23+
handle: invalidFunction("event stream request is not supported in browser.")
24+
}),
2225
eventStreamSerdeProvider,
2326
regionDefaultProvider: invalidFunction("Region is missing") as any,
2427
requestHandler: new FetchHttpHandler(),

Diff for: clients/client-transcribe-streaming/runtimeConfig.native.ts

+5
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@ export const ClientDefaultValues: Required<ClientDefaults> = {
99
...BrowserDefaults,
1010
runtime: "react-native",
1111
defaultUserAgent: `aws-sdk-js-v3-react-native-${name}/${version}`,
12+
eventStreamPayloadHandlerProvider: () => ({
13+
handle: invalidFunction(
14+
"event stream request is not supported in ReactNative."
15+
)
16+
}),
1217
eventStreamSerdeProvider: () => ({
1318
serialize: invalidFunction("event stream is not supported in ReactNative."),
1419
deserialize: invalidFunction(

Diff for: clients/client-transcribe-streaming/runtimeConfig.ts

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import { name, version } from "./package.json";
22
import { defaultProvider as credentialDefaultProvider } from "@aws-sdk/credential-provider-node";
3+
import { eventStreamPayloadHandlerProvider } from "@aws-sdk/eventstream-handler-node";
34
import { eventStreamSerdeProvider } from "@aws-sdk/eventstream-serde-node";
45
import { Hash } from "@aws-sdk/hash-node";
5-
import { NodeHttpHandler, streamCollector } from "@aws-sdk/node-http-handler";
6+
import { NodeHttp2Handler, streamCollector } from "@aws-sdk/node-http-handler";
67
import { defaultProvider as regionDefaultProvider } from "@aws-sdk/region-provider";
78
import { parseUrl } from "@aws-sdk/url-parser-node";
89
import { fromBase64, toBase64 } from "@aws-sdk/util-base64-node";
@@ -20,9 +21,10 @@ export const ClientDefaultValues: Required<ClientDefaults> = {
2021
bodyLengthChecker: calculateBodyLength,
2122
credentialDefaultProvider,
2223
defaultUserAgent: defaultUserAgent(name, version),
24+
eventStreamPayloadHandlerProvider,
2325
eventStreamSerdeProvider,
2426
regionDefaultProvider,
25-
requestHandler: new NodeHttpHandler(),
27+
requestHandler: new NodeHttp2Handler(),
2628
sha256: Hash.bind(null, "sha256"),
2729
streamCollector,
2830
urlParser: parseUrl,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { TranscribeStreaming } from "../index";
2+
import { createReadStream } from "fs";
3+
import { join } from "path";
4+
const audio = createReadStream(join(__dirname, "speech.wav"));
5+
6+
describe("TranscribeStream client", () => {
7+
const client = new TranscribeStreaming({});
8+
afterAll(() => {
9+
client.destroy();
10+
});
11+
12+
it("should stream the transcript", async () => {
13+
const LanguageCode = "en-US";
14+
const MediaEncoding = "pcm";
15+
const MediaSampleRateHertz = 44100;
16+
const result = await client.startStreamTranscription({
17+
LanguageCode,
18+
MediaEncoding,
19+
MediaSampleRateHertz,
20+
AudioStream: (async function* () {
21+
for await (const chunk of audio) {
22+
yield { AudioEvent: { AudioChunk: chunk } };
23+
}
24+
})()
25+
});
26+
expect(result.LanguageCode).toBe(LanguageCode);
27+
expect(result.MediaEncoding).toBe(MediaEncoding);
28+
expect(result.MediaSampleRateHertz).toBe(MediaSampleRateHertz);
29+
expect(result.TranscriptResultStream).toBeDefined();
30+
const transcripts = [];
31+
for await (const event of result.TranscriptResultStream!) {
32+
transcripts.push(event);
33+
}
34+
expect(
35+
transcripts.filter(event => event["TranscriptEvent"]).length
36+
).toBeGreaterThan(0);
37+
}, 60000);
38+
});

Diff for: clients/client-transcribe-streaming/test/speech.wav

2.38 MB
Binary file not shown.

Diff for: clients/client-transcribe-streaming/tsconfig.es.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@
1515
"es2015.symbol.wellknown"
1616
],
1717
"outDir": "dist/es"
18-
}
18+
},
19+
"exclude": ["test"]
1920
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package software.amazon.smithy.aws.typescript.codegen;
2+
3+
import static software.amazon.smithy.typescript.codegen.integration.RuntimeClientPlugin.Convention.HAS_CONFIG;
4+
import static software.amazon.smithy.typescript.codegen.integration.RuntimeClientPlugin.Convention.HAS_MIDDLEWARE;
5+
6+
7+
import java.util.Collections;
8+
import java.util.List;
9+
import java.util.Map;
10+
import java.util.Set;
11+
import java.util.function.Consumer;
12+
13+
import software.amazon.smithy.codegen.core.SymbolProvider;
14+
import software.amazon.smithy.model.Model;
15+
import software.amazon.smithy.model.knowledge.EventStreamIndex;
16+
import software.amazon.smithy.model.knowledge.TopDownIndex;
17+
import software.amazon.smithy.model.shapes.OperationShape;
18+
import software.amazon.smithy.model.shapes.ServiceShape;
19+
import software.amazon.smithy.typescript.codegen.LanguageTarget;
20+
import software.amazon.smithy.typescript.codegen.TypeScriptDependency;
21+
import software.amazon.smithy.typescript.codegen.TypeScriptSettings;
22+
import software.amazon.smithy.typescript.codegen.TypeScriptWriter;
23+
import software.amazon.smithy.typescript.codegen.integration.RuntimeClientPlugin;
24+
import software.amazon.smithy.typescript.codegen.integration.TypeScriptIntegration;
25+
import software.amazon.smithy.utils.ListUtils;
26+
import software.amazon.smithy.utils.MapUtils;
27+
28+
/**
29+
* Adds runtime client plugins that handle the eventstream flow in request,
30+
* including eventstream payload signing.
31+
*/
32+
public class AddEventStreamHandlingDependency implements TypeScriptIntegration {
33+
@Override
34+
public List<RuntimeClientPlugin> getClientPlugins() {
35+
return ListUtils.of(
36+
RuntimeClientPlugin.builder()
37+
.withConventions(AwsDependency.MIDDLEWARE_EVENTSTREAM.dependency,
38+
"EventStream", HAS_CONFIG)
39+
.servicePredicate(AddEventStreamHandlingDependency::hasEventStreamInput)
40+
.build(),
41+
RuntimeClientPlugin.builder()
42+
.withConventions(AwsDependency.MIDDLEWARE_EVENTSTREAM.dependency,
43+
"EventStream", HAS_MIDDLEWARE)
44+
.operationPredicate(AddEventStreamHandlingDependency::hasEventStreamInput)
45+
.build()
46+
);
47+
}
48+
49+
@Override
50+
public void addConfigInterfaceFields(
51+
TypeScriptSettings settings,
52+
Model model,
53+
SymbolProvider symbolProvider,
54+
TypeScriptWriter writer
55+
) {
56+
if (hasEventStreamInput(model, settings.getService(model))) {
57+
writer.addImport("EventStreamPayloadHandlerProvider", "__EventStreamPayloadHandlerProvider",
58+
TypeScriptDependency.AWS_SDK_TYPES.packageName);
59+
writer.writeDocs("The function that provides necessary utilities for handling request event stream.");
60+
writer.write("eventStreamPayloadHandlerProvider?: __EventStreamPayloadHandlerProvider;\n");
61+
}
62+
}
63+
64+
@Override
65+
public Map<String, Consumer<TypeScriptWriter>> getRuntimeConfigWriters(
66+
TypeScriptSettings settings,
67+
Model model,
68+
SymbolProvider symbolProvider,
69+
LanguageTarget target
70+
) {
71+
ServiceShape service = settings.getService(model);
72+
if (!hasEventStreamInput(model, service)) {
73+
return Collections.emptyMap();
74+
}
75+
76+
switch (target) {
77+
case NODE:
78+
return MapUtils.of("eventStreamPayloadHandlerProvider", writer -> {
79+
writer.addDependency(AwsDependency.AWS_SDK_EVENTSTREAM_HANDLER_NODE);
80+
writer.addImport("eventStreamPayloadHandlerProvider", "eventStreamPayloadHandlerProvider",
81+
AwsDependency.AWS_SDK_EVENTSTREAM_HANDLER_NODE.packageName);
82+
writer.write("eventStreamPayloadHandlerProvider,");
83+
});
84+
case BROWSER:
85+
/**
86+
* Browser doesn't support streaming requests as of March 2020.
87+
* Each service client needs to support eventstream request in browser individually.
88+
* Services like TranscribeStreaming support it via WebSocket.
89+
*/
90+
return MapUtils.of("eventStreamPayloadHandlerProvider", writer -> {
91+
writer.addDependency(TypeScriptDependency.INVALID_DEPENDENCY);
92+
writer.addImport("invalidFunction", "invalidFunction",
93+
TypeScriptDependency.INVALID_DEPENDENCY.packageName);
94+
writer.openBlock("eventStreamPayloadHandlerProvider: () => ({", "}),", () -> {
95+
writer.write("handle: invalidFunction(\"event stream request is not supported in browser.\"),");
96+
});
97+
});
98+
case REACT_NATIVE:
99+
/**
100+
* ReactNative doesn't support streaming requests as of March 2020.
101+
* Here we don't supply invalidFunction. Each service client needs to support eventstream request
102+
* in RN has to implement a customization providing its own eventStreamSignerProvider
103+
*/
104+
return MapUtils.of("eventStreamPayloadHandlerProvider", writer -> {
105+
writer.addDependency(TypeScriptDependency.INVALID_DEPENDENCY);
106+
writer.addImport("invalidFunction", "invalidFunction",
107+
TypeScriptDependency.INVALID_DEPENDENCY.packageName);
108+
writer.openBlock("eventStreamPayloadHandlerProvider: () => ({", "}),", () -> {
109+
writer.write("handle: invalidFunction(\"event stream request is not supported in ReactNative.\"),");
110+
});
111+
});
112+
default:
113+
return Collections.emptyMap();
114+
}
115+
}
116+
117+
private static boolean hasEventStreamInput(Model model, ServiceShape service) {
118+
TopDownIndex topDownIndex = model.getKnowledge(TopDownIndex.class);
119+
Set<OperationShape> operations = topDownIndex.getContainedOperations(service);
120+
EventStreamIndex eventStreamIndex = model.getKnowledge(EventStreamIndex.class);
121+
for (OperationShape operation : operations) {
122+
if (eventStreamIndex.getInputInfo(operation).isPresent()) {
123+
return true;
124+
}
125+
}
126+
return false;
127+
}
128+
129+
private static boolean hasEventStreamInput(Model model, ServiceShape service, OperationShape operation) {
130+
EventStreamIndex eventStreamIndex = model.getKnowledge(EventStreamIndex.class);
131+
return eventStreamIndex.getInputInfo(operation).isPresent();
132+
}
133+
}

0 commit comments

Comments
 (0)