Skip to content

Commit a5c8ea8

Browse files
authored
chore(cli-integ): ResourcePool will not reacquire dead processes (#24496)
The ResourcePool acquires any available region from a collection of regions, each protected by a mutex. The underlying mutex supports stealing locks from processes that have died, but the ResourcePool built on top never exercises that capability, because it will only try to acquire locks from mutexes it "knows" are free (i.e., for which it has seen an "unlock" event). Try every mutex on every acquisition. This is less efficient, but it has the advantage of recovering from processes that died. There is no test for this as the test is very awkward to write (depends on subprocesses being dead). Also fix a bug in XpMutex itself where `acquire()` was supposed to call `tryAcquire()`, but that was fat-fingered into a recursive call. ---- *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
1 parent c02e52b commit a5c8ea8

File tree

3 files changed

+52
-10
lines changed

3 files changed

+52
-10
lines changed

packages/@aws-cdk-testing/cli-integ/lib/resource-pool.ts

+5-9
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,12 @@ export class ResourcePool<A extends string=string> {
4444
while (true) {
4545
// Start a wait on the unlock now -- if the unlock signal comes after
4646
// we try to acquire but before we start the wait, we might miss it.
47-
const wait = this.pool.awaitUnlock(5000);
47+
//
48+
// (The timeout is in case the unlock signal doesn't come for whatever reason).
49+
const wait = this.pool.awaitUnlock(10_000);
4850

49-
for (const res of this.unlockedResources()) {
51+
// Try all mutexes, we might need to reacquire an expired lock
52+
for (const res of this.resources) {
5053
const lease = await this.tryObtainLease(res);
5154
if (lease) {
5255
// Ignore the wait (count as handled)
@@ -107,13 +110,6 @@ export class ResourcePool<A extends string=string> {
107110
delete this.locks[value];
108111
await lock?.release();
109112
}
110-
111-
/**
112-
* Return all resources that we definitely don't own the locks for
113-
*/
114-
private unlockedResources(): A[] {
115-
return this.resources.filter(res => !this.locks[res]);
116-
}
117113
}
118114

119115
/**

packages/@aws-cdk-testing/cli-integ/lib/xpmutex.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ export class XpMutex {
141141
// signal due to unfortunate timing.
142142
const wait = this.pool.awaitUnlock(5000);
143143

144-
const lock = await this.acquire();
144+
const lock = await this.tryAcquire();
145145
if (lock) {
146146
// Ignore the wait (count as handled)
147147
wait.then(() => {}, () => {});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { XpMutexPool } from '../lib/xpmutex';
2+
3+
const POOL = XpMutexPool.fromName('test-pool');
4+
5+
test('acquire waits', async () => {
6+
const mux = POOL.mutex('testA');
7+
let secondLockAcquired = false;
8+
9+
// Current "process" acquires lock
10+
const lock = await mux.acquire();
11+
12+
// Start a second "process" that tries to acquire the lock
13+
const secondProcess = (async () => {
14+
const secondLock = await mux.acquire();
15+
try {
16+
secondLockAcquired = true;
17+
} finally {
18+
await secondLock.release();
19+
}
20+
})();
21+
22+
// Once we release the lock the second process is free to take it
23+
expect(secondLockAcquired).toBe(false);
24+
await lock.release();
25+
26+
// We expect the variable to become true
27+
await waitFor(() => secondLockAcquired);
28+
expect(secondLockAcquired).toBe(true);
29+
30+
await secondProcess;
31+
});
32+
33+
34+
/**
35+
* Poll for some condition every 10ms
36+
*/
37+
function waitFor(pred: () => boolean): Promise<void> {
38+
return new Promise((ok) => {
39+
const timerHandle = setInterval(() => {
40+
if (pred()) {
41+
clearInterval(timerHandle);
42+
ok();
43+
}
44+
}, 5);
45+
});
46+
}

0 commit comments

Comments
 (0)