Skip to content

Commit f2d4830

Browse files
authored
feat(gatsby-core-utils): create proper mutex (#34761)
1 parent 21ef185 commit f2d4830

File tree

11 files changed

+333
-26
lines changed

11 files changed

+333
-26
lines changed

docs/docs/how-to/testing/unit-testing.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ module.exports = {
4343
".+\\.(css|styl|less|sass|scss)$": `identity-obj-proxy`,
4444
".+\\.(jpg|jpeg|png|gif|eot|otf|webp|svg|ttf|woff|woff2|mp4|webm|wav|mp3|m4a|aac|oga)$": `<rootDir>/__mocks__/file-mock.js`,
4545
"^gatsby-page-utils/(.*)$": `gatsby-page-utils/dist/$1`, // Workaround for https://github.com/facebook/jest/issues/9771
46+
"^gatsby-core-utils/(.*)$": `gatsby-core-utils/dist/$1`, // Workaround for https://github.com/facebook/jest/issues/9771
4647
},
4748
testPathIgnorePatterns: [`node_modules`, `\\.cache`, `<rootDir>.*/public`],
4849
transformIgnorePatterns: [`node_modules/(?!(gatsby)/)`],

examples/using-jest/jest.config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ module.exports = {
66
".+\\.(css|styl|less|sass|scss)$": `identity-obj-proxy`,
77
".+\\.(jpg|jpeg|png|gif|eot|otf|webp|svg|ttf|woff|woff2|mp4|webm|wav|mp3|m4a|aac|oga)$": `<rootDir>/__mocks__/file-mock.js`,
88
"^gatsby-page-utils/(.*)$": `gatsby-page-utils/dist/$1`, // Workaround for https://github.com/facebook/jest/issues/9771
9+
"^gatsby-core-utils/(.*)$": `gatsby-core-utils/dist/$1`, // Workaround for https://github.com/facebook/jest/issues/9771
910
},
1011
testPathIgnorePatterns: [`node_modules`, `.cache`],
1112
transformIgnorePatterns: [`node_modules/(?!(gatsby)/)`],

jest.config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ module.exports = {
4747
"^ordered-binary$": `<rootDir>/node_modules/ordered-binary/dist/index.cjs`,
4848
"^msgpackr$": `<rootDir>/node_modules/msgpackr/dist/node.cjs`,
4949
"^gatsby-page-utils/(.*)$": `gatsby-page-utils/dist/$1`, // Workaround for https://github.com/facebook/jest/issues/9771
50+
"^gatsby-core-utils/(.*)$": `gatsby-core-utils/dist/$1`, // Workaround for https://github.com/facebook/jest/issues/9771
5051
},
5152
snapshotSerializers: [`jest-serializer-path`],
5253
collectCoverageFrom: coverageDirs,

packages/gatsby-core-utils/README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,3 +104,20 @@ const requireUtil = createRequireFromPath("../src/utils/")
104104
requireUtil("./some-tool")
105105
// ...
106106
```
107+
108+
### Mutex
109+
110+
When working inside workers or async operations you want some kind of concurrency control that a specific work load can only concurrent one at a time. This is what a [Mutex](https://en.wikipedia.org/wiki/Mutual_exclusion) does.
111+
112+
By implementing the following code, the code is only executed one at a time and the other threads/async workloads are awaited until the current one is done. This is handy when writing to the same file to disk.
113+
114+
```js
115+
const { createMutex } = require("gatsby-core-utils/mutex")
116+
117+
const mutex = createMutex("my-custom-mutex-key")
118+
await mutex.acquire()
119+
120+
await fs.writeFile("pathToFile", "my custom content")
121+
122+
await mutex.release()
123+
```

packages/gatsby-core-utils/package.json

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,18 @@
66
"gatsby",
77
"gatsby-core-utils"
88
],
9+
"exports": {
10+
".": "./dist/index.js",
11+
"./*": "./dist/*.js"
12+
},
13+
"typesVersions": {
14+
"*": {
15+
"*": [
16+
"dist/*.d.ts",
17+
"dist/index.d.ts"
18+
]
19+
}
20+
},
921
"author": "Ward Peeters <[email protected]>",
1022
"homepage": "https://github.com/gatsbyjs/gatsby/tree/master/packages/gatsby-core-utils#readme",
1123
"license": "MIT",
@@ -36,9 +48,12 @@
3648
"file-type": "^16.5.3",
3749
"fs-extra": "^10.0.0",
3850
"got": "^11.8.3",
51+
"import-from": "^4.0.0",
3952
"lock": "^1.1.0",
53+
"lmdb": "^2.1.7",
4054
"node-object-hash": "^2.3.10",
4155
"proper-lockfile": "^4.1.2",
56+
"resolve-from": "^5.0.0",
4257
"tmp": "^0.2.1",
4358
"xdg-basedir": "^4.0.0"
4459
},
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import path from "path"
2+
import { remove, mkdirp } from "fs-extra"
3+
import { createMutex } from "../mutex"
4+
import * as storage from "../utils/get-storage"
5+
6+
jest.spyOn(storage, `getDatabaseDir`)
7+
8+
function sleep(timeout = 100): Promise<void> {
9+
return new Promise(resolve => setTimeout(resolve, timeout))
10+
}
11+
12+
async function doAsync(
13+
mutex: ReturnType<typeof createMutex>,
14+
result: Array<string> = [],
15+
waitTime: number,
16+
id: string
17+
): Promise<Array<string>> {
18+
await mutex.acquire()
19+
result.push(`start ${id}`)
20+
await sleep(waitTime)
21+
result.push(`stop ${id}`)
22+
await mutex.release()
23+
24+
return result
25+
}
26+
27+
describe(`mutex`, () => {
28+
const cachePath = path.join(__dirname, `.cache`)
29+
beforeAll(async () => {
30+
await mkdirp(cachePath)
31+
storage.getDatabaseDir.mockReturnValue(cachePath)
32+
})
33+
34+
afterAll(async () => {
35+
await storage.closeDatabase()
36+
await remove(cachePath)
37+
})
38+
39+
it(`should only allow one action go through at the same time`, async () => {
40+
const mutex = createMutex(`test-key`, 300)
41+
42+
const result: Array<string> = []
43+
44+
doAsync(mutex, result, 50, `1`)
45+
await sleep(0)
46+
await doAsync(mutex, result, 10, `2`)
47+
48+
expect(result).toMatchInlineSnapshot(`
49+
Array [
50+
"start 1",
51+
"stop 1",
52+
"start 2",
53+
"stop 2",
54+
]
55+
`)
56+
})
57+
58+
it(`should generate the same mutex if key are identical`, async () => {
59+
const mutex1 = createMutex(`test-key`, 300)
60+
const mutex2 = createMutex(`test-key`, 300)
61+
62+
const result: Array<string> = []
63+
64+
const mutexPromise = doAsync(mutex1, result, 50, `1`)
65+
await sleep(0)
66+
await doAsync(mutex2, result, 10, `2`)
67+
await mutexPromise
68+
69+
expect(result).toMatchInlineSnapshot(`
70+
Array [
71+
"start 1",
72+
"stop 1",
73+
"start 2",
74+
"stop 2",
75+
]
76+
`)
77+
})
78+
79+
it(`shouldn't wait if keys are different`, async () => {
80+
const mutex1 = createMutex(`test-key`, 300)
81+
const mutex2 = createMutex(`other-key`, 300)
82+
83+
const result: Array<string> = []
84+
85+
const mutexPromise = doAsync(mutex1, result, 50, `1`)
86+
await sleep(0)
87+
await doAsync(mutex2, result, 10, `2`)
88+
await mutexPromise
89+
90+
expect(result).toMatchInlineSnapshot(`
91+
Array [
92+
"start 1",
93+
"start 2",
94+
"stop 2",
95+
"stop 1",
96+
]
97+
`)
98+
})
99+
})
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import { getStorage, LockStatus, getDatabaseDir } from "./utils/get-storage"
2+
3+
interface IMutex {
4+
acquire(): Promise<void>
5+
release(): Promise<void>
6+
}
7+
8+
// Random number to re-check if mutex got released
9+
const DEFAULT_MUTEX_INTERVAL = 3000
10+
11+
async function waitUntilUnlocked(
12+
storage: ReturnType<typeof getStorage>,
13+
key: string,
14+
timeout: number
15+
): Promise<void> {
16+
const isUnlocked = await storage.mutex.ifNoExists(key, () => {
17+
storage.mutex.put(key, LockStatus.Locked)
18+
})
19+
20+
if (isUnlocked) {
21+
return
22+
}
23+
24+
await new Promise<void>(resolve => {
25+
setTimeout(() => {
26+
resolve(waitUntilUnlocked(storage, key, timeout))
27+
}, timeout)
28+
})
29+
}
30+
31+
/**
32+
* Creates a mutex, make sure to call `release` when you're done with it.
33+
*
34+
* @param {string} key A unique key
35+
*/
36+
export function createMutex(
37+
key: string,
38+
timeout = DEFAULT_MUTEX_INTERVAL
39+
): IMutex {
40+
const storage = getStorage(getDatabaseDir())
41+
const BUILD_ID = global.__GATSBY?.buildId ?? ``
42+
const prefixedKey = `${BUILD_ID}-${key}`
43+
44+
return {
45+
acquire: (): Promise<void> =>
46+
waitUntilUnlocked(storage, prefixedKey, timeout),
47+
release: async (): Promise<void> => {
48+
await storage.mutex.remove(prefixedKey)
49+
},
50+
}
51+
}
52+
53+
export async function releaseAllMutexes(): Promise<void> {
54+
const storage = getStorage(getDatabaseDir())
55+
56+
await storage.mutex.clearAsync()
57+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import path from "path"
2+
import importFrom from "import-from"
3+
import resolveFrom from "resolve-from"
4+
5+
export function getLmdb(): typeof import("lmdb") {
6+
const gatsbyPkgRoot = path.dirname(
7+
resolveFrom(process.cwd(), `gatsby/package.json`)
8+
)
9+
10+
// Try to use lmdb from gatsby if not we use our own version
11+
try {
12+
return importFrom(gatsbyPkgRoot, `lmdb`) as typeof import("lmdb")
13+
} catch (err) {
14+
return require(`lmdb`)
15+
}
16+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import path from "path"
2+
import { getLmdb } from "./get-lmdb"
3+
import type { RootDatabase, Database } from "lmdb"
4+
5+
export enum LockStatus {
6+
Locked = 0,
7+
Unlocked = 1,
8+
}
9+
10+
interface ICoreUtilsDatabase {
11+
mutex: Database<LockStatus, string>
12+
}
13+
14+
let databases: ICoreUtilsDatabase | undefined
15+
let rootDb: RootDatabase
16+
17+
export function getDatabaseDir(): string {
18+
const rootDir = global.__GATSBY?.root ?? process.cwd()
19+
return path.join(rootDir, `.cache`, `data`, `gatsby-core-utils`)
20+
}
21+
22+
export function getStorage(fullDbPath: string): ICoreUtilsDatabase {
23+
if (!databases) {
24+
if (!fullDbPath) {
25+
throw new Error(`LMDB path is not set!`)
26+
}
27+
28+
// __GATSBY_OPEN_LMDBS tracks if we already opened given db in this process
29+
// In `gatsby serve` case we might try to open it twice - once for engines
30+
// and second to get access to `SitePage` nodes (to power trailing slashes
31+
// redirect middleware). This ensure there is single instance within a process.
32+
// Using more instances seems to cause weird random errors.
33+
if (!globalThis.__GATSBY_OPEN_LMDBS) {
34+
globalThis.__GATSBY_OPEN_LMDBS = new Map()
35+
}
36+
37+
databases = globalThis.__GATSBY_OPEN_LMDBS.get(fullDbPath)
38+
39+
if (databases) {
40+
return databases
41+
}
42+
43+
const open = getLmdb().open
44+
45+
rootDb = open({
46+
name: `root`,
47+
path: fullDbPath,
48+
compression: true,
49+
sharedStructuresKey: Symbol.for(`structures`),
50+
})
51+
52+
databases = {
53+
mutex: rootDb.openDB({
54+
name: `mutex`,
55+
}),
56+
}
57+
58+
globalThis.__GATSBY_OPEN_LMDBS.set(fullDbPath, databases)
59+
}
60+
61+
return databases as ICoreUtilsDatabase
62+
}
63+
64+
export async function closeDatabase(): Promise<void> {
65+
if (rootDb) {
66+
await rootDb.close()
67+
}
68+
}

packages/gatsby/src/services/initialize.ts

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import _ from "lodash"
22
import { slash, isCI } from "gatsby-core-utils"
3+
import { releaseAllMutexes } from "gatsby-core-utils/mutex"
34
import fs from "fs-extra"
45
import md5File from "md5-file"
56
import crypto from "crypto"
@@ -412,34 +413,29 @@ export async function initialize({
412413
// }
413414
// }
414415

415-
if (
416-
process.env.GATSBY_EXPERIMENTAL_PRESERVE_FILE_DOWNLOAD_CACHE ||
417-
process.env.GATSBY_EXPERIMENTAL_PRESERVE_WEBPACK_CACHE
418-
) {
419-
const deleteGlobs = [
420-
// By default delete all files & subdirectories
421-
`${cacheDirectory}/**`,
422-
`${cacheDirectory}/*/`,
423-
]
424-
425-
if (process.env.GATSBY_EXPERIMENTAL_PRESERVE_FILE_DOWNLOAD_CACHE) {
426-
// Stop the caches directory from being deleted, add all sub directories,
427-
// but remove gatsby-source-filesystem
428-
deleteGlobs.push(`!${cacheDirectory}/caches`)
429-
deleteGlobs.push(`${cacheDirectory}/caches/*`)
430-
deleteGlobs.push(`!${cacheDirectory}/caches/gatsby-source-filesystem`)
431-
}
416+
const deleteGlobs = [
417+
// By default delete all files & subdirectories
418+
`${cacheDirectory}/**`,
419+
`!${cacheDirectory}/data`,
420+
`${cacheDirectory}/data/**`,
421+
`!${cacheDirectory}/data/gatsby-core-utils/`,
422+
`!${cacheDirectory}/data/gatsby-core-utils/**`,
423+
]
424+
425+
if (process.env.GATSBY_EXPERIMENTAL_PRESERVE_FILE_DOWNLOAD_CACHE) {
426+
// Stop the caches directory from being deleted, add all sub directories,
427+
// but remove gatsby-source-filesystem
428+
deleteGlobs.push(`!${cacheDirectory}/caches`)
429+
deleteGlobs.push(`${cacheDirectory}/caches/*`)
430+
deleteGlobs.push(`!${cacheDirectory}/caches/gatsby-source-filesystem`)
431+
}
432432

433-
if (process.env.GATSBY_EXPERIMENTAL_PRESERVE_WEBPACK_CACHE) {
434-
// Add webpack
435-
deleteGlobs.push(`!${cacheDirectory}/webpack`)
436-
}
437-
await del(deleteGlobs)
438-
} else {
439-
// Attempt to empty dir if remove fails,
440-
// like when directory is mount point
441-
await fs.remove(cacheDirectory).catch(() => fs.emptyDir(cacheDirectory))
433+
if (process.env.GATSBY_EXPERIMENTAL_PRESERVE_WEBPACK_CACHE) {
434+
// Add webpack
435+
deleteGlobs.push(`!${cacheDirectory}/webpack`)
442436
}
437+
438+
await del(deleteGlobs)
443439
} catch (e) {
444440
reporter.error(`Failed to remove .cache files.`, e)
445441
}
@@ -450,6 +446,9 @@ export async function initialize({
450446
cacheIsCorrupt,
451447
})
452448

449+
// make sure all previous mutexes are released
450+
await releaseAllMutexes()
451+
453452
// in future this should show which plugin's caches are purged
454453
// possibly should also have which plugins had caches
455454
telemetry.decorateEvent(`BUILD_END`, {

0 commit comments

Comments
 (0)