Skip to content

Commit 0e6384b

Browse files
hanslalexeagle
authored andcommitted
fix(@angular-devkit/core): Properly subscribe/unsubscribe to observables
This has been a source of headaches when debugging the Architect jobs. We keep all subscriptions now and unsubscribe them when the result is done.
1 parent ef93ff8 commit 0e6384b

File tree

1 file changed

+32
-35
lines changed

1 file changed

+32
-35
lines changed

packages/angular_devkit/core/src/experimental/jobs/create-job-handler.ts

Lines changed: 32 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
* found in the LICENSE file at https://angular.io/license
77
*
88
*/
9-
import { Observable, Observer, Subject, Subscription, from, isObservable } from 'rxjs';
9+
import { Observable, Observer, Subject, Subscription, from, isObservable, of } from 'rxjs';
1010
import { switchMap, tap } from 'rxjs/operators';
1111
import { BaseException } from '../../exception/index';
1212
import { JsonValue } from '../../json/index';
@@ -72,8 +72,17 @@ export function createJobHandler<A extends JsonValue, I extends JsonValue, O ext
7272
let subscription: Subscription;
7373

7474
return new Observable<JobOutboundMessage<O>>(subject => {
75+
function complete() {
76+
if (subscription) {
77+
subscription.unsubscribe();
78+
}
79+
subject.next({ kind: JobOutboundMessageKind.End, description });
80+
subject.complete();
81+
inputChannel.complete();
82+
}
83+
7584
// Handle input.
76-
inboundBus.subscribe(message => {
85+
const inboundSub = inboundBus.subscribe(message => {
7786
switch (message.kind) {
7887
case JobInboundMessageKind.Ping:
7988
subject.next({ kind: JobOutboundMessageKind.Pong, description, id: message.id });
@@ -82,13 +91,7 @@ export function createJobHandler<A extends JsonValue, I extends JsonValue, O ext
8291
case JobInboundMessageKind.Stop:
8392
// There's no way to cancel a promise or a synchronous function, but we do cancel
8493
// observables where possible.
85-
if (subscription) {
86-
subscription.unsubscribe();
87-
}
88-
subject.next({ kind: JobOutboundMessageKind.End, description });
89-
subject.complete();
90-
// Close all channels.
91-
channels.forEach(x => x.complete());
94+
complete();
9295
break;
9396

9497
case JobInboundMessageKind.Input:
@@ -99,7 +102,7 @@ export function createJobHandler<A extends JsonValue, I extends JsonValue, O ext
99102

100103
// Configure a logger to pass in as additional context.
101104
const logger = new Logger('job');
102-
logger.subscribe(entry => {
105+
const logSub = logger.subscribe(entry => {
103106
subject.next({
104107
kind: JobOutboundMessageKind.Log,
105108
description,
@@ -108,8 +111,6 @@ export function createJobHandler<A extends JsonValue, I extends JsonValue, O ext
108111
});
109112

110113
// Execute the function with the additional context.
111-
subject.next({ kind: JobOutboundMessageKind.Start, description });
112-
113114
const channels = new Map<string, Subject<JsonValue>>();
114115

115116
const newContext: SimpleJobHandlerContext<A, I, O> = {
@@ -121,7 +122,7 @@ export function createJobHandler<A extends JsonValue, I extends JsonValue, O ext
121122
throw new ChannelAlreadyExistException(name);
122123
}
123124
const channelSubject = new Subject<JsonValue>();
124-
channelSubject.subscribe(
125+
const channelSub = channelSubject.subscribe(
125126
message => {
126127
subject.next({
127128
kind: JobOutboundMessageKind.ChannelMessage, description, name, message,
@@ -140,36 +141,32 @@ export function createJobHandler<A extends JsonValue, I extends JsonValue, O ext
140141
);
141142

142143
channels.set(name, channelSubject);
144+
if (subscription) {
145+
subscription.add(channelSub);
146+
}
143147

144148
return channelSubject;
145149
},
146150
};
147151

148-
const result = fn(argument, newContext);
152+
subject.next({ kind: JobOutboundMessageKind.Start, description });
153+
let result = fn(argument, newContext);
149154
// If the result is a promise, simply wait for it to complete before reporting the result.
150155
if (isPromise(result)) {
151-
result.then(result => {
152-
subject.next({ kind: JobOutboundMessageKind.Output, description, value: result });
153-
subject.next({ kind: JobOutboundMessageKind.End, description });
154-
subject.complete();
155-
}, err => subject.error(err));
156-
} else if (isObservable(result)) {
157-
subscription = (result as Observable<O>).subscribe(
158-
(value: O) => subject.next({ kind: JobOutboundMessageKind.Output, description, value }),
159-
error => subject.error(error),
160-
() => {
161-
subject.next({ kind: JobOutboundMessageKind.End, description });
162-
subject.complete();
163-
},
164-
);
165-
166-
return subscription;
167-
} else {
168-
// If it's a scalar value, report it synchronously.
169-
subject.next({ kind: JobOutboundMessageKind.Output, description, value: result as O });
170-
subject.next({ kind: JobOutboundMessageKind.End, description });
171-
subject.complete();
156+
result = from(result);
157+
} else if (!isObservable(result)) {
158+
result = of(result as O);
172159
}
160+
161+
subscription = (result as Observable<O>).subscribe(
162+
(value: O) => subject.next({ kind: JobOutboundMessageKind.Output, description, value }),
163+
error => subject.error(error),
164+
() => complete(),
165+
);
166+
subscription.add(inboundSub);
167+
subscription.add(logSub);
168+
169+
return subscription;
173170
});
174171
};
175172

0 commit comments

Comments
 (0)