Skip to content

Commit 5982a17

Browse files
GatsbyJS Botvladar
GatsbyJS Bot
andauthored
feat(gatsby): PQR: merge data dependencies from workers to the main process (#32305) (#32438)
* feat(gatsby): PQR: merge data dependencies from workers to the main process * Move worker state merging to a standalone step * revert acciental change * different approach: replay actions vs merging state * revert unneded changes * do not use inline snapshot * Update packages/gatsby/src/utils/worker/__tests__/queries.ts Co-authored-by: Lennart <[email protected]> Co-authored-by: Lennart <[email protected]> (cherry picked from commit bdb9352) Co-authored-by: Vladimir Razuvaev <[email protected]>
1 parent 7499b22 commit 5982a17

File tree

3 files changed

+197
-2
lines changed

3 files changed

+197
-2
lines changed

packages/gatsby/src/utils/worker/__tests__/queries.ts

+140-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import sourceNodesAndRemoveStaleNodes from "../../source-nodes"
77
import {
88
savePartialStateToDisk,
99
store,
10+
emitter,
1011
loadPartialStateFromDisk,
1112
} from "../../../redux"
1213
import { loadConfigAndPlugins } from "../../../bootstrap/load-config-and-plugins"
@@ -167,7 +168,6 @@ describeWhenLMDB(`worker (queries)`, () => {
167168
savePartialStateToDisk([`components`, `staticQueryComponents`])
168169

169170
await Promise.all(worker.all.buildSchema())
170-
await worker.single.runQueries(queryIdsSmall)
171171
})
172172

173173
afterAll(() => {
@@ -180,9 +180,15 @@ describeWhenLMDB(`worker (queries)`, () => {
180180
}
181181
})
182182

183+
// This was the original implementation of state syncing between a worker and the main process.
184+
// We switched to "replaying actions" as a mechanism for state syncing.
185+
// But we can get back to state saving / merging if "replaying actions" proves to be too expensive
186+
// TODO: delete or re-activate depending on results yielded by "replaying actions" approach.
187+
// The logic for `loadPartialStateFromDisk` itself is tested in `share-state` tests
183188
it(`should save worker "queries" state to disk`, async () => {
184189
if (!worker) fail(`worker not defined`)
185190

191+
await worker.single.runQueries(queryIdsSmall)
186192
await Promise.all(worker.all.saveQueries())
187193
// Pass "1" as workerId as the test only have one worker
188194
const result = loadPartialStateFromDisk([`queries`], `1`)
@@ -233,6 +239,8 @@ describeWhenLMDB(`worker (queries)`, () => {
233239

234240
it(`should execute static queries`, async () => {
235241
if (!worker) fail(`worker not defined`)
242+
243+
await worker.single.runQueries(queryIdsSmall)
236244
const stateFromWorker = await worker.single.getState()
237245

238246
const staticQueryResult = await fs.readJson(
@@ -250,6 +258,8 @@ describeWhenLMDB(`worker (queries)`, () => {
250258

251259
it(`should execute page queries`, async () => {
252260
if (!worker) fail(`worker not defined`)
261+
262+
await worker.single.runQueries(queryIdsSmall)
253263
const stateFromWorker = await worker.single.getState()
254264

255265
const pageQueryResult = await fs.readJson(
@@ -265,6 +275,8 @@ describeWhenLMDB(`worker (queries)`, () => {
265275

266276
it(`should execute page queries with context variables`, async () => {
267277
if (!worker) fail(`worker not defined`)
278+
279+
await worker.single.runQueries(queryIdsSmall)
268280
const stateFromWorker = await worker.single.getState()
269281

270282
const pageQueryResult = await fs.readJson(
@@ -331,4 +343,131 @@ describeWhenLMDB(`worker (queries)`, () => {
331343

332344
spy.mockRestore()
333345
})
346+
347+
it(`should return actions occurred in worker to replay in the main process`, async () => {
348+
const result = await worker.single.runQueries(queryIdsSmall)
349+
350+
const expectedActionShapes = {
351+
QUERY_START: [`componentPath`, `isPage`, `path`],
352+
PAGE_QUERY_RUN: [`componentPath`, `isPage`, `path`, `resultHash`],
353+
CREATE_COMPONENT_DEPENDENCY: [`nodeId`, `path`],
354+
ADD_PENDING_PAGE_DATA_WRITE: [`path`],
355+
}
356+
expect(result).toBeArrayOfSize(11)
357+
358+
for (const action of result) {
359+
expect(action.type).toBeOneOf(Object.keys(expectedActionShapes))
360+
expect(action.payload).toContainKeys(expectedActionShapes[action.type])
361+
}
362+
// Double-check that important actions are actually present
363+
expect(result).toContainValue(
364+
expect.objectContaining({ type: `QUERY_START` })
365+
)
366+
expect(result).toContainValue(
367+
expect.objectContaining({ type: `PAGE_QUERY_RUN` })
368+
)
369+
})
370+
371+
it(`should replay selected worker actions in runQueriesInWorkersQueue`, async () => {
372+
const expectedActions = [
373+
{
374+
payload: {
375+
componentPath: `/static-query-component.js`,
376+
isPage: false,
377+
path: `sq--q1`,
378+
},
379+
type: `QUERY_START`,
380+
},
381+
{
382+
payload: {
383+
nodeId: `ceb8e742-a2ce-5110-a560-94c93d1c71a5`,
384+
path: `sq--q1`,
385+
},
386+
plugin: ``,
387+
type: `CREATE_COMPONENT_DEPENDENCY`,
388+
},
389+
{
390+
payload: {
391+
componentPath: `/static-query-component.js`,
392+
isPage: false,
393+
path: `sq--q1`,
394+
queryHash: `q1-hash`,
395+
resultHash: `Dr5hgCDB+R0S9oRBWeZYj3lB7VI=`,
396+
},
397+
type: `PAGE_QUERY_RUN`,
398+
},
399+
{
400+
payload: {
401+
componentPath: `/foo.js`,
402+
isPage: true,
403+
path: `/foo`,
404+
},
405+
type: `QUERY_START`,
406+
},
407+
{
408+
payload: {
409+
componentPath: `/bar.js`,
410+
isPage: true,
411+
path: `/bar`,
412+
},
413+
type: `QUERY_START`,
414+
},
415+
{
416+
payload: {
417+
nodeId: `ceb8e742-a2ce-5110-a560-94c93d1c71a5`,
418+
path: `/foo`,
419+
},
420+
plugin: ``,
421+
type: `CREATE_COMPONENT_DEPENDENCY`,
422+
},
423+
{
424+
payload: {
425+
nodeId: `ceb8e742-a2ce-5110-a560-94c93d1c71a5`,
426+
path: `/bar`,
427+
},
428+
plugin: ``,
429+
type: `CREATE_COMPONENT_DEPENDENCY`,
430+
},
431+
{
432+
payload: {
433+
path: `/foo`,
434+
},
435+
type: `ADD_PENDING_PAGE_DATA_WRITE`,
436+
},
437+
{
438+
payload: {
439+
componentPath: `/foo.js`,
440+
isPage: true,
441+
path: `/foo`,
442+
resultHash: `8dW7PoqwZNk/0U8LO6kTj1qBCwU=`,
443+
},
444+
type: `PAGE_QUERY_RUN`,
445+
},
446+
{
447+
payload: {
448+
path: `/bar`,
449+
},
450+
type: `ADD_PENDING_PAGE_DATA_WRITE`,
451+
},
452+
{
453+
payload: {
454+
componentPath: `/bar.js`,
455+
isPage: true,
456+
path: `/bar`,
457+
resultHash: `iKmhf9XgbsfK7qJw0tw95pmGwJM=`,
458+
},
459+
type: `PAGE_QUERY_RUN`,
460+
},
461+
]
462+
463+
const actualActions: Array<any> = []
464+
function listenActions(action): void {
465+
actualActions.push(action)
466+
}
467+
emitter.on(`*`, listenActions)
468+
await runQueriesInWorkersQueue(worker, queryIdsSmall)
469+
emitter.off(`*`, listenActions)
470+
471+
expect(actualActions).toContainAllValues(expectedActions)
472+
})
334473
})

packages/gatsby/src/utils/worker/child/queries.ts

+39-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ import { GraphQLRunner } from "../../../query/graphql-runner"
88
import { getDataStore } from "../../../datastore"
99
import { setState } from "./state"
1010
import { buildSchema } from "./schema"
11+
import {
12+
IAddPendingPageDataWriteAction,
13+
ICreatePageDependencyAction,
14+
IPageQueryRunAction,
15+
IQueryStartAction,
16+
} from "../../../redux/types"
1117

1218
export function setComponents(): void {
1319
setState([`components`, `staticQueryComponents`])
@@ -29,7 +35,39 @@ function getGraphqlRunner(): GraphQLRunner {
2935
return gqlRunner
3036
}
3137

32-
export async function runQueries(queryIds: IGroupedQueryIds): Promise<void> {
38+
type ActionsToReplay = Array<
39+
| IQueryStartAction
40+
| IPageQueryRunAction
41+
| IAddPendingPageDataWriteAction
42+
| ICreatePageDependencyAction
43+
>
44+
45+
export async function runQueries(
46+
queryIds: IGroupedQueryIds
47+
): Promise<ActionsToReplay> {
48+
const actionsToReplay: ActionsToReplay = []
49+
50+
const unsubscribe = store.subscribe(() => {
51+
const action = store.getState().lastAction
52+
if (
53+
action.type === `QUERY_START` ||
54+
action.type === `PAGE_QUERY_RUN` ||
55+
action.type === `ADD_PENDING_PAGE_DATA_WRITE` ||
56+
action.type === `CREATE_COMPONENT_DEPENDENCY`
57+
) {
58+
actionsToReplay.push(action)
59+
}
60+
})
61+
62+
try {
63+
await doRunQueries(queryIds)
64+
return actionsToReplay
65+
} finally {
66+
unsubscribe()
67+
}
68+
}
69+
70+
async function doRunQueries(queryIds: IGroupedQueryIds): Promise<void> {
3371
const workerStore = store.getState()
3472

3573
// If buildSchema() didn't run yet, execute it

packages/gatsby/src/utils/worker/pool.ts

+18
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import { initJobsMessagingInMainProcess } from "../jobs/worker-messaging"
88
import { initReporterMessagingInMainProcess } from "./reporter"
99

1010
import { GatsbyWorkerPool } from "./types"
11+
import { store } from "../../redux"
12+
import { ActionsUnion } from "../../redux/types"
1113

1214
export type { GatsbyWorkerPool }
1315

@@ -48,6 +50,7 @@ export async function runQueriesInWorkersQueue(
4850
promises.push(
4951
pool.single
5052
.runQueries({ pageQueryIds: [], staticQueryIds: segment })
53+
.then(replayWorkerActions)
5154
.then(() => {
5255
activity.tick(segment.length)
5356
})
@@ -58,6 +61,7 @@ export async function runQueriesInWorkersQueue(
5861
promises.push(
5962
pool.single
6063
.runQueries({ pageQueryIds: segment, staticQueryIds: [] })
64+
.then(replayWorkerActions)
6165
.then(() => {
6266
activity.tick(segment.length)
6367
})
@@ -68,3 +72,17 @@ export async function runQueriesInWorkersQueue(
6872

6973
activity.end()
7074
}
75+
76+
async function replayWorkerActions(
77+
actions: Array<ActionsUnion>
78+
): Promise<void> {
79+
let i = 1
80+
for (const action of actions) {
81+
store.dispatch(action)
82+
83+
// Give event loop some breath
84+
if (i++ % 100 === 0) {
85+
await new Promise(resolve => process.nextTick(resolve))
86+
}
87+
}
88+
}

0 commit comments

Comments
 (0)