-
Notifications
You must be signed in to change notification settings - Fork 86
test: refactor serverless invocations so in-process and sandbox implementation use shared code and ability to run multiple invocations in same sandbox #2871
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
4a716ac
70511b7
6e83413
87e1dd6
6b3a40a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,12 @@ import { env } from 'node:process' | |
import { fileURLToPath } from 'node:url' | ||
import { v4 } from 'uuid' | ||
import { LocalServer } from './local-server.js' | ||
import { loadFunction, type FunctionInvocationOptions } from './lambda-helpers.mjs' | ||
import { | ||
type InvokeFunctionResult, | ||
loadFunction, | ||
type LoadFunctionOptions, | ||
type FunctionInvocationOptions, | ||
} from './lambda-helpers.mjs' | ||
|
||
import { glob } from 'fast-glob' | ||
import { | ||
|
@@ -405,48 +410,140 @@ export async function invokeEdgeFunction( | |
}) | ||
} | ||
|
||
export async function invokeSandboxedFunction( | ||
/** | ||
* Load function in child process and allow for multiple invocations | ||
*/ | ||
export async function loadSandboxedFunction( | ||
ctx: FixtureTestContext, | ||
options: Parameters<typeof invokeFunction>[1] = {}, | ||
options: LoadFunctionOptions = {}, | ||
) { | ||
return new Promise<ReturnType<typeof invokeFunction>>((resolve, reject) => { | ||
const childProcess = spawn(process.execPath, [import.meta.dirname + '/sandbox-child.mjs'], { | ||
stdio: ['pipe', 'pipe', 'pipe', 'ipc'], | ||
cwd: process.cwd(), | ||
}) | ||
const childProcess = spawn(process.execPath, [import.meta.dirname + '/sandbox-child.mjs'], { | ||
stdio: ['pipe', 'pipe', 'pipe', 'ipc'], | ||
cwd: join(ctx.functionDist, SERVER_HANDLER_NAME), | ||
env: { | ||
...process.env, | ||
...(options.env || {}), | ||
}, | ||
}) | ||
|
||
childProcess.stdout?.on('data', (data) => { | ||
console.log(data.toString()) | ||
}) | ||
let isRunning = true | ||
let operationCounter = 1 | ||
|
||
childProcess.stderr?.on('data', (data) => { | ||
console.error(data.toString()) | ||
}) | ||
childProcess.stdout?.on('data', (data) => { | ||
console.log(data.toString()) | ||
}) | ||
|
||
childProcess.on('message', (msg: any) => { | ||
if (msg?.action === 'invokeFunctionResult') { | ||
resolve(msg.result) | ||
childProcess.send({ action: 'exit' }) | ||
} | ||
}) | ||
childProcess.stderr?.on('data', (data) => { | ||
console.error(data.toString()) | ||
}) | ||
|
||
const onGoingOperationsMap = new Map< | ||
number, | ||
{ | ||
resolve: (value?: any) => void | ||
reject: (reason?: any) => void | ||
} | ||
>() | ||
|
||
function createOperation<T>() { | ||
const operationId = operationCounter | ||
operationCounter += 1 | ||
|
||
childProcess.on('exit', () => { | ||
reject(new Error('worker exited before returning result')) | ||
let promiseResolve, promiseReject | ||
const promise = new Promise<T>((innerResolve, innerReject) => { | ||
promiseResolve = innerResolve | ||
promiseReject = innerReject | ||
}) | ||
|
||
function resolve(value: T) { | ||
onGoingOperationsMap.delete(operationId) | ||
promiseResolve?.(value) | ||
} | ||
function reject(reason) { | ||
onGoingOperationsMap.delete(operationId) | ||
promiseReject?.(reason) | ||
} | ||
|
||
onGoingOperationsMap.set(operationId, { resolve, reject }) | ||
return { operationId, promise, resolve, reject } | ||
} | ||
|
||
childProcess.on('exit', () => { | ||
isRunning = false | ||
|
||
const error = new Error('worker exited before returning result') | ||
|
||
for (const { reject } of onGoingOperationsMap.values()) { | ||
reject(error) | ||
} | ||
}) | ||
|
||
function exit() { | ||
if (isRunning) { | ||
childProcess.send({ action: 'exit' }) | ||
} | ||
} | ||
|
||
// make sure to exit the child process when the test is done just in case | ||
ctx.cleanup?.push(async () => exit()) | ||
|
||
const { promise: loadPromise, resolve: loadResolve } = createOperation<void>() | ||
|
||
childProcess.on('message', (msg: any) => { | ||
if (msg?.action === 'invokeFunctionResult') { | ||
onGoingOperationsMap.get(msg.operationId)?.resolve(msg.result) | ||
} else if (msg?.action === 'loadedFunction') { | ||
loadResolve() | ||
} | ||
}) | ||
|
||
// context object is not serializable so we create serializable object | ||
// containing required properties to invoke lambda | ||
const serializableCtx = { | ||
functionDist: ctx.functionDist, | ||
blobStoreHost: ctx.blobStoreHost, | ||
siteID: ctx.siteID, | ||
deployID: ctx.deployID, | ||
} | ||
|
||
childProcess.send({ | ||
action: 'loadFunction', | ||
args: [serializableCtx], | ||
}) | ||
|
||
await loadPromise | ||
|
||
function invokeFunction(options: FunctionInvocationOptions): InvokeFunctionResult { | ||
if (!isRunning) { | ||
throw new Error('worker is not running anymore') | ||
} | ||
|
||
const { operationId, promise } = createOperation<Awaited<InvokeFunctionResult>>() | ||
|
||
childProcess.send({ | ||
action: 'invokeFunction', | ||
args: [ | ||
// context object is not serializable so we create serializable object | ||
// containing required properties to invoke lambda | ||
{ | ||
functionDist: ctx.functionDist, | ||
blobStoreHost: ctx.blobStoreHost, | ||
siteID: ctx.siteID, | ||
deployID: ctx.deployID, | ||
}, | ||
options, | ||
], | ||
operationId, | ||
args: [serializableCtx, options], | ||
}) | ||
}) | ||
|
||
return promise | ||
} | ||
|
||
return { | ||
invokeFunction, | ||
exit, | ||
} | ||
} | ||
|
||
/** | ||
* Load function in child process and execute single invocation | ||
*/ | ||
export async function invokeSandboxedFunction( | ||
ctx: FixtureTestContext, | ||
options: FunctionInvocationOptions = {}, | ||
) { | ||
const { invokeFunction, exit } = await loadSandboxedFunction(ctx, options) | ||
const result = await invokeFunction(options) | ||
exit() | ||
return result | ||
Comment on lines
+538
to
+548
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this ensures existing tests using |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,11 @@ import { loadFunction } from './lambda-helpers.mjs' | |
|
||
getLogger().level = 'alert' | ||
|
||
/** | ||
* @type {import('./lambda-helpers.mjs').InvokeFunction | undefined} | ||
*/ | ||
let invokeFunctionImpl | ||
|
||
process.on( | ||
'message', | ||
/** | ||
|
@@ -13,16 +18,29 @@ process.on( | |
async (msg) => { | ||
if (msg?.action === 'exit') { | ||
process.exit(0) | ||
} else if (msg?.action === 'loadFunction') { | ||
const [ctx, options] = msg.args | ||
|
||
invokeFunctionImpl = await loadFunction(ctx, options) | ||
|
||
if (process.send) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when is this false? is this expected? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typescript generally it won't exist in parent process or if there is no IPC bridge setup (this is to send IPC messages to parent process), but in our current usage it will actually always be defined |
||
process.send({ | ||
action: 'loadedFunction', | ||
}) | ||
} | ||
} else if (msg?.action === 'invokeFunction') { | ||
try { | ||
const [ctx, options] = msg.args | ||
|
||
const invokeFunctionImpl = await loadFunction(ctx, options) | ||
if (!invokeFunctionImpl) { | ||
throw new Error('Function not loaded') | ||
} | ||
const result = await invokeFunctionImpl(options) | ||
|
||
if (process.send) { | ||
process.send({ | ||
action: 'invokeFunctionResult', | ||
operationId: msg.operationId, | ||
result, | ||
}) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor fix here to ensure CWD in sandboxes matches what we already do when invoking in process (with cwd mock there)