Skip to content

Commit 7480667

Browse files
authored
fix(node-http-handler): http2 lets node exit (#3541)
Http2Session keeps node alive even with no requests running, presumably for server pushed streams. This is a behavior change from node's http 1 Agent, which does not keep node alive when there is no request running. Emulate that behavior by unref()ing new sessions, and wrapping requests in a ref() / unref() pair.
1 parent 473f9b5 commit 7480667

File tree

2 files changed

+109
-14
lines changed

2 files changed

+109
-14
lines changed

packages/node-http-handler/src/node-http2-handler.spec.ts

+102-14
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { AbortController } from "@aws-sdk/abort-controller";
22
import { HttpRequest } from "@aws-sdk/protocol-http";
33
import { rejects } from "assert";
4-
import http2, { constants, Http2Stream } from "http2";
4+
import http2, { ClientHttp2Session, ClientHttp2Stream, constants, Http2Stream } from "http2";
55
import { Duplex } from "stream";
66
import { promisify } from "util";
77

@@ -44,7 +44,20 @@ describe(NodeHttp2Handler.name, () => {
4444
});
4545

4646
describe("without options", () => {
47+
let createdSessions!: ClientHttp2Session[];
48+
const connectReal = http2.connect;
49+
let connectSpy!: jest.SpiedFunction<typeof http2.connect>;
50+
4751
beforeEach(() => {
52+
createdSessions = [];
53+
connectSpy = jest.spyOn(http2, "connect").mockImplementation((...args) => {
54+
const session = connectReal(...args);
55+
jest.spyOn(session, "ref");
56+
jest.spyOn(session, "unref");
57+
createdSessions.push(session);
58+
return session;
59+
});
60+
4861
nodeH2Handler = new NodeHttp2Handler();
4962
});
5063

@@ -58,51 +71,99 @@ describe(NodeHttp2Handler.name, () => {
5871

5972
describe("number calls to http2.connect", () => {
6073
it("is zero on initialization", () => {
61-
const connectSpy = jest.spyOn(http2, "connect");
6274
expect(connectSpy).not.toHaveBeenCalled();
6375
});
6476

6577
it("is one when request is made", async () => {
66-
const connectSpy = jest.spyOn(http2, "connect");
67-
6878
// Make single request.
69-
await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
79+
const response = await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
80+
const responseBody = response.response.body as ClientHttp2Stream;
7081

7182
const authority = `${protocol}//${hostname}:${port}`;
7283
expect(connectSpy).toHaveBeenCalledTimes(1);
7384
expect(connectSpy).toHaveBeenCalledWith(authority);
85+
86+
// Keeping node alive while request is open.
87+
expect(createdSessions[0].ref).toHaveBeenCalledTimes(1);
88+
expect(createdSessions[0].unref).toHaveBeenCalledTimes(1);
89+
90+
const closed = new Promise<void>((resolve) => responseBody.once("close", resolve));
91+
(response.response.body as ClientHttp2Stream).destroy();
92+
await closed;
93+
94+
// No longer keeping node alive
95+
expect(createdSessions[0].ref).toHaveBeenCalledTimes(1);
96+
expect(createdSessions[0].unref).toHaveBeenCalledTimes(2);
7497
});
7598

7699
it("is one if multiple requests are made on same URL", async () => {
77100
const connectSpy = jest.spyOn(http2, "connect");
78101

79102
// Make two requests.
80-
await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
81-
await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
103+
const response1 = await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
104+
const response1Body = response1.response.body as ClientHttp2Stream;
105+
const response2 = await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
106+
const response2Body = response2.response.body as ClientHttp2Stream;
82107

83108
const authority = `${protocol}//${hostname}:${port}`;
84109
expect(connectSpy).toHaveBeenCalledTimes(1);
85110
expect(connectSpy).toHaveBeenCalledWith(authority);
111+
112+
// Keeping node alive while requests are open.
113+
expect(createdSessions[0].ref).toHaveBeenCalledTimes(2);
114+
expect(createdSessions[0].unref).toHaveBeenCalledTimes(1);
115+
116+
const closed1 = new Promise<void>((resolve) => response1Body.once("close", resolve));
117+
response1Body.destroy();
118+
await closed1;
119+
const closed2 = new Promise<void>((resolve) => response2Body.once("close", resolve));
120+
response2Body.destroy();
121+
await closed2;
122+
123+
// No longer keeping node alive
124+
expect(createdSessions[0].ref).toHaveBeenCalledTimes(2);
125+
expect(createdSessions[0].unref).toHaveBeenCalledTimes(3);
86126
});
87127

88128
it("is many if requests are made on different URLs", async () => {
89129
const connectSpy = jest.spyOn(http2, "connect");
90130

91131
// Make first request on default URL.
92-
await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
132+
const response1 = await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
133+
const response1Body = response1.response.body as ClientHttp2Stream;
93134

94135
const port2 = port + 1;
95136
const mockH2Server2 = createMockHttp2Server().listen(port2);
96137
mockH2Server2.on("request", createResponseFunction(mockResponse));
97138

98139
// Make second request on URL with port2.
99-
await nodeH2Handler.handle(new HttpRequest({ ...getMockReqOptions(), port: port2 }), {});
140+
const response2 = await nodeH2Handler.handle(new HttpRequest({ ...getMockReqOptions(), port: port2 }), {});
141+
const response2Body = response2.response.body as ClientHttp2Stream;
100142

101143
const authorityPrefix = `${protocol}//${hostname}`;
102144
expect(connectSpy).toHaveBeenCalledTimes(2);
103145
expect(connectSpy).toHaveBeenNthCalledWith(1, `${authorityPrefix}:${port}`);
104146
expect(connectSpy).toHaveBeenNthCalledWith(2, `${authorityPrefix}:${port2}`);
105147
mockH2Server2.close();
148+
149+
// Keeping node alive while requests are open.
150+
expect(createdSessions[0].ref).toHaveBeenCalledTimes(1);
151+
expect(createdSessions[0].unref).toHaveBeenCalledTimes(1);
152+
expect(createdSessions[1].ref).toHaveBeenCalledTimes(1);
153+
expect(createdSessions[1].unref).toHaveBeenCalledTimes(1);
154+
155+
const closed1 = new Promise<void>((resolve) => response1Body.once("close", resolve));
156+
response1Body.destroy();
157+
await closed1;
158+
const closed2 = new Promise<void>((resolve) => response2Body.once("close", resolve));
159+
response2Body.destroy();
160+
await closed2;
161+
162+
// No longer keeping node alive
163+
expect(createdSessions[0].ref).toHaveBeenCalledTimes(1);
164+
expect(createdSessions[0].unref).toHaveBeenCalledTimes(2);
165+
expect(createdSessions[1].ref).toHaveBeenCalledTimes(1);
166+
expect(createdSessions[1].unref).toHaveBeenCalledTimes(2);
106167
});
107168
});
108169

@@ -151,26 +212,44 @@ describe(NodeHttp2Handler.name, () => {
151212
expect(establishedConnections).toBe(3);
152213
expect(numRequests).toBe(3);
153214

215+
// Not keeping node alive
216+
expect(createdSessions).toHaveLength(3);
217+
expect(createdSessions[0].ref).toHaveBeenCalledTimes(1);
218+
expect(createdSessions[0].unref).toHaveBeenCalledTimes(2);
219+
expect(createdSessions[1].ref).toHaveBeenCalledTimes(1);
220+
expect(createdSessions[1].unref).toHaveBeenCalledTimes(2);
221+
expect(createdSessions[2].ref).toHaveBeenCalledTimes(1);
222+
expect(createdSessions[2].unref).toHaveBeenCalledTimes(2);
223+
154224
// should be able to recover from goaway after reconnecting to a server
155225
// that doesn't send goaway, and reuse the TCP connection (Http2Session)
156226
shouldSendGoAway = false;
157227
mockH2Server3.on("request", createResponseFunction(mockResponse));
158-
await nodeH2Handler.handle(req, {});
159228
const result = await nodeH2Handler.handle(req, {});
160229
const resultReader = result.response.body;
161230

231+
// Keeping node alive
232+
expect(createdSessions).toHaveLength(4);
233+
expect(createdSessions[3].ref).toHaveBeenCalledTimes(1);
234+
expect(createdSessions[3].unref).toHaveBeenCalledTimes(1);
235+
162236
// ...and validate that the mocked response is received
163237
const responseBody = await new Promise((resolve) => {
164238
const buffers = [];
165239
resultReader.on("data", (chunk) => buffers.push(chunk));
166-
resultReader.on("end", () => {
240+
resultReader.on("close", () => {
167241
resolve(Buffer.concat(buffers).toString("utf8"));
168242
});
169243
});
170244
expect(responseBody).toBe("test");
171245
expect(establishedConnections).toBe(4);
172-
expect(numRequests).toBe(5);
246+
expect(numRequests).toBe(4);
173247
mockH2Server3.close();
248+
249+
// Not keeping node alive
250+
expect(createdSessions).toHaveLength(4);
251+
expect(createdSessions[3].ref).toHaveBeenCalledTimes(1);
252+
expect(createdSessions[3].unref).toHaveBeenCalledTimes(2);
174253
});
175254

176255
it("handles connections destroyed by servers", async () => {
@@ -212,6 +291,15 @@ describe(NodeHttp2Handler.name, () => {
212291
expect(establishedConnections).toBe(3);
213292
expect(numRequests).toBe(3);
214293
mockH2Server3.close();
294+
295+
// Not keeping node alive
296+
expect(createdSessions).toHaveLength(3);
297+
expect(createdSessions[0].ref).toHaveBeenCalledTimes(1);
298+
expect(createdSessions[0].unref).toHaveBeenCalledTimes(2);
299+
expect(createdSessions[1].ref).toHaveBeenCalledTimes(1);
300+
expect(createdSessions[1].unref).toHaveBeenCalledTimes(2);
301+
expect(createdSessions[2].ref).toHaveBeenCalledTimes(1);
302+
expect(createdSessions[2].unref).toHaveBeenCalledTimes(2);
215303
});
216304
});
217305

@@ -380,7 +468,7 @@ describe(NodeHttp2Handler.name, () => {
380468
const authority = `${protocol}//${hostname}:${port}`;
381469
// @ts-ignore: access private property
382470
const session: ClientHttp2Session = nodeH2Handler.sessionCache.get(authority)[0];
383-
const fakeStream = new Duplex();
471+
const fakeStream = new Duplex() as ClientHttp2Stream;
384472
const fakeRstCode = 1;
385473
// @ts-ignore: fake result code
386474
fakeStream.rstCode = fakeRstCode;
@@ -405,7 +493,7 @@ describe(NodeHttp2Handler.name, () => {
405493
const authority = `${protocol}//${hostname}:${port}`;
406494
// @ts-ignore: access private property
407495
const session: ClientHttp2Session = nodeH2Handler.sessionCache.get(authority)[0];
408-
const fakeStream = new Duplex();
496+
const fakeStream = new Duplex() as ClientHttp2Stream;
409497
jest.spyOn(session, "request").mockImplementation(() => fakeStream);
410498
// @ts-ignore: access private property
411499
nodeH2Handler.sessionCache.set(`${protocol}//${hostname}:${port}`, [session]);

packages/node-http-handler/src/node-http2-handler.ts

+7
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ export class NodeHttp2Handler implements HttpHandler {
8989
[constants.HTTP2_HEADER_METHOD]: method,
9090
});
9191

92+
// Keep node alive while request is in progress. Matched with unref() in close event.
93+
session.ref();
94+
9295
req.on("response", (headers) => {
9396
const httpResponse = new HttpResponse({
9497
statusCode: headers[":status"] || -1,
@@ -137,6 +140,7 @@ export class NodeHttp2Handler implements HttpHandler {
137140
// http2stream.rstCode property. If the code is any value other than NGHTTP2_NO_ERROR (0),
138141
// an 'error' event will have also been emitted.
139142
req.on("close", () => {
143+
session.unref();
140144
if (this.disableConcurrentStreams) {
141145
session.destroy();
142146
}
@@ -164,6 +168,9 @@ export class NodeHttp2Handler implements HttpHandler {
164168
if (existingSessions.length > 0 && !disableConcurrentStreams) return existingSessions[0];
165169

166170
const newSession = connect(authority);
171+
// AWS SDK does not expect server push streams, don't keep node alive without a request.
172+
newSession.unref();
173+
167174
const destroySessionCb = () => {
168175
this.destroySession(newSession);
169176
this.deleteSessionFromCache(authority, newSession);

0 commit comments

Comments
 (0)