Skip to content

Commit 988835d

Browse files
hanslalexeagle
authored andcommitted
feat(@angular-devkit/core): add a reuse JobStrategy
It allows a job to be reused if it's still running. This includes redirecting the inputs to the new job.
1 parent 4571197 commit 988835d

File tree

2 files changed

+134
-3
lines changed

2 files changed

+134
-3
lines changed

packages/angular_devkit/core/src/experimental/jobs/strategy.ts

+67-3
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,16 @@
55
* Use of this source code is governed by an MIT-style license that can be
66
* found in the LICENSE file at https://angular.io/license
77
*/
8-
import { Observable, concat, of } from 'rxjs';
9-
import { ignoreElements, share, shareReplay } from 'rxjs/operators';
8+
import { Observable, Subject, concat, of } from 'rxjs';
9+
import { finalize, ignoreElements, share, shareReplay, tap } from 'rxjs/operators';
1010
import { JsonValue } from '../../json';
11-
import { JobDescription, JobHandler, JobHandlerContext, JobOutboundMessage } from './api';
11+
import {
12+
JobDescription,
13+
JobHandler,
14+
JobHandlerContext, JobInboundMessage,
15+
JobOutboundMessage,
16+
JobOutboundMessageKind,
17+
} from './api';
1218

1319
const stableStringify = require('fast-json-stable-stringify');
1420

@@ -49,6 +55,64 @@ export namespace strategy {
4955
}
5056

5157

58+
/**
59+
* Creates a JobStrategy that will always reuse a running job, and restart it if the job ended.
60+
* @param replayMessages Replay ALL messages if a job is reused, otherwise just hook up where it
61+
* is.
62+
*/
63+
export function reuse<
64+
A extends JsonValue = JsonValue,
65+
I extends JsonValue = JsonValue,
66+
O extends JsonValue = JsonValue,
67+
>(replayMessages = false): JobStrategy<A, I, O> {
68+
let inboundBus = new Subject<JobInboundMessage<I>>();
69+
let runContext: JobHandlerContext | null = null;
70+
let run: Observable<JobOutboundMessage<O>> | null = null;
71+
let state: JobOutboundMessage<O> | null = null;
72+
73+
return (handler, options) => {
74+
const newHandler = (argument: A, context: JobHandlerContext<A, I, O>) => {
75+
// Forward inputs.
76+
const subscription = context.inboundBus.subscribe(inboundBus);
77+
78+
if (run) {
79+
return concat(
80+
// Update state.
81+
of(state),
82+
run,
83+
).pipe(
84+
finalize(() => subscription.unsubscribe()),
85+
);
86+
}
87+
88+
run = handler(argument, { ...context, inboundBus: inboundBus.asObservable() }).pipe(
89+
tap(
90+
message => {
91+
if (message.kind == JobOutboundMessageKind.Start
92+
|| message.kind == JobOutboundMessageKind.OnReady
93+
|| message.kind == JobOutboundMessageKind.End) {
94+
state = message;
95+
}
96+
},
97+
undefined,
98+
() => {
99+
subscription.unsubscribe();
100+
inboundBus = new Subject<JobInboundMessage<I>>();
101+
run = null;
102+
},
103+
),
104+
replayMessages ? shareReplay() : share(),
105+
);
106+
runContext = context;
107+
108+
return run;
109+
};
110+
111+
return Object.assign(newHandler, handler, options || {});
112+
};
113+
}
114+
115+
52116
/**
53117
* Creates a JobStrategy that will reuse a running job if the argument matches.
54118
* @param replayMessages Replay ALL messages if a job is reused, otherwise just hook up where it

packages/angular_devkit/core/src/experimental/jobs/strategy_spec.ts

+67
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,73 @@ describe('strategy.serialize()', () => {
131131
});
132132
});
133133

134+
describe('strategy.reuse()', () => {
135+
let registry: SimpleJobRegistry;
136+
let scheduler: SimpleScheduler;
137+
138+
beforeEach(() => {
139+
registry = new SimpleJobRegistry();
140+
scheduler = new SimpleScheduler(registry);
141+
});
142+
143+
it('works', async () => {
144+
let started = 0;
145+
let finished = 0;
146+
147+
registry.register(strategy.reuse()(createJobHandler((input: number[]) => {
148+
started++;
149+
150+
return new Promise<number>(
151+
resolve => setTimeout(() => {
152+
finished++;
153+
resolve(input.reduce((a, c) => a + c, 0));
154+
}, 10),
155+
);
156+
})), {
157+
argument: { items: { type: 'number' } },
158+
output: { type: 'number' },
159+
name: 'add',
160+
});
161+
162+
const job1 = await scheduler.schedule('add', [1, 2, 3, 4]);
163+
const job2 = await scheduler.schedule('add', []);
164+
expect(started).toBe(0);
165+
expect(finished).toBe(0);
166+
167+
job1.output.subscribe();
168+
expect(started).toBe(1);
169+
expect(finished).toBe(0);
170+
171+
job2.output.subscribe();
172+
expect(started).toBe(1); // job2 is reusing job1.
173+
expect(finished).toBe(0);
174+
175+
let result = await job1.output.toPromise();
176+
expect(result).toBe(10);
177+
expect(started).toBe(1);
178+
expect(finished).toBe(1);
179+
expect(job1.state).toBe(JobState.Ended);
180+
expect(job2.state).toBe(JobState.Ended);
181+
182+
const job3 = await scheduler.schedule('add', [1, 2, 3, 4, 5]);
183+
const job4 = await scheduler.schedule('add', []);
184+
job3.output.subscribe();
185+
expect(started).toBe(2);
186+
expect(finished).toBe(1);
187+
188+
job4.output.subscribe();
189+
expect(started).toBe(2); // job4 is reusing job3.
190+
expect(finished).toBe(1);
191+
192+
result = await job3.output.toPromise();
193+
expect(result).toBe(15);
194+
expect(started).toBe(2);
195+
expect(finished).toBe(2);
196+
expect(job3.state).toBe(JobState.Ended);
197+
expect(job4.state).toBe(JobState.Ended);
198+
});
199+
});
200+
134201
describe('strategy.memoize()', () => {
135202
let registry: SimpleJobRegistry;
136203
let scheduler: SimpleScheduler;

0 commit comments

Comments
 (0)