forked from aws/aws-lambda-nodejs-runtime-interface-client
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathStreamingContext.js
83 lines (73 loc) · 2.46 KB
/
StreamingContext.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/**
* Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*/
'use strict';
const BeforeExitListener = require('./BeforeExitListener.js');
const { InvalidStreamingOperation } = require('./Errors');
const { verbose, vverbose } = require('./VerboseLog.js').logger('STREAM');
const { tryCallFail } = require('./ResponseStream');
const { structuredConsole } = require('./LogPatch');
/**
* Construct the base-context object which includes the required flags and
* callback methods for the Node programming model.
* @param client {RAPIDClient}
* The RAPID client used to post results/errors.
* @param id {string}
* The invokeId for the current invocation.
* @param scheduleNext {function}
* A function which takes no params and immediately schedules the next
* iteration of the invoke loop.
* @param options {object}
* An object with optional properties for streaming.
* @return {context}
* Context object that has the createStream function.
*/
module.exports.build = function (client, id, scheduleNext, options) {
let waitForEmptyEventLoop = true;
const scheduleNextNow = () => {
verbose('StreamingContext::scheduleNextNow entered');
if (!waitForEmptyEventLoop) {
scheduleNext();
} else {
BeforeExitListener.set(() => {
setImmediate(() => {
scheduleNext();
});
});
}
};
let isStreamCreated = false;
const streamingContext = {
get callbackWaitsForEmptyEventLoop() {
return waitForEmptyEventLoop;
},
set callbackWaitsForEmptyEventLoop(value) {
waitForEmptyEventLoop = value;
},
createStream: (callback) => {
if (isStreamCreated) {
throw new InvalidStreamingOperation(
'Cannot create stream for the same StreamingContext more than once.',
);
}
const { request: responseStream, responseDone: rapidResponse } =
client.getStreamForInvocationResponse(id, callback, options);
isStreamCreated = true;
vverbose('StreamingContext::createStream stream created');
return {
fail: (err, callback) => {
structuredConsole.logError('Invoke Error', err);
tryCallFail(responseStream, err, callback);
},
responseStream,
rapidResponse,
scheduleNext: () => {
verbose('StreamingContext::createStream scheduleNext');
BeforeExitListener.reset();
scheduleNextNow();
},
};
},
};
return streamingContext;
};