6
6
* found in the LICENSE file at https://angular.io/license
7
7
*/
8
8
import { analytics , experimental , json , logging } from '@angular-devkit/core' ;
9
- import { Observable , from , of } from 'rxjs' ;
10
- import { concatMap , first , map , shareReplay , switchMap } from 'rxjs/operators' ;
9
+ import { Observable , from , merge , of , onErrorResumeNext } from 'rxjs' ;
10
+ import {
11
+ concatMap ,
12
+ first ,
13
+ ignoreElements ,
14
+ last ,
15
+ map ,
16
+ shareReplay ,
17
+ switchMap ,
18
+ takeUntil ,
19
+ } from 'rxjs/operators' ;
11
20
import {
12
21
BuilderInfo ,
13
22
BuilderInput ,
@@ -39,7 +48,8 @@ function _createJobHandlerFromBuilderInfo(
39
48
} ;
40
49
41
50
function handler ( argument : json . JsonObject , context : experimental . jobs . JobHandlerContext ) {
42
- const inboundBus = context . inboundBus . pipe (
51
+ // Add input validation to the inbound bus.
52
+ const inboundBusWithInputValidation = context . inboundBus . pipe (
43
53
concatMap ( message => {
44
54
if ( message . kind === experimental . jobs . JobInboundMessageKind . Input ) {
45
55
const v = message . value as BuilderInput ;
@@ -51,14 +61,12 @@ function _createJobHandlerFromBuilderInfo(
51
61
// Validate v against the options schema.
52
62
return registry . compile ( info . optionSchema ) . pipe (
53
63
concatMap ( validation => validation ( options ) ) ,
54
- map ( result => {
55
- if ( result . success ) {
56
- return { ...v , options : result . data } as BuilderInput ;
57
- } else if ( result . errors ) {
58
- throw new json . schema . SchemaValidationException ( result . errors ) ;
59
- } else {
60
- return v ;
64
+ map ( ( { data, success, errors } ) => {
65
+ if ( success ) {
66
+ return { ...v , options : data } as BuilderInput ;
61
67
}
68
+
69
+ throw new json . schema . SchemaValidationException ( errors ) ;
62
70
} ) ,
63
71
map ( value => ( { ...message , value } ) ) ,
64
72
) ;
@@ -71,7 +79,11 @@ function _createJobHandlerFromBuilderInfo(
71
79
shareReplay ( 1 ) ,
72
80
) ;
73
81
74
- return from ( host . loadBuilder ( info ) ) . pipe (
82
+ // Make an inboundBus that completes instead of erroring out.
83
+ // We'll merge the errors into the output instead.
84
+ const inboundBus = onErrorResumeNext ( inboundBusWithInputValidation ) ;
85
+
86
+ const output = from ( host . loadBuilder ( info ) ) . pipe (
75
87
concatMap ( builder => {
76
88
if ( builder === null ) {
77
89
throw new Error ( `Cannot load builder for builderInfo ${ JSON . stringify ( info , null , 2 ) } ` ) ;
@@ -94,7 +106,19 @@ function _createJobHandlerFromBuilderInfo(
94
106
} ) ,
95
107
) ;
96
108
} ) ,
109
+ // Share subscriptions to the output, otherwise the the handler will be re-run.
110
+ shareReplay ( ) ,
97
111
) ;
112
+
113
+ // Separate the errors from the inbound bus into their own observable that completes when the
114
+ // builder output does.
115
+ const inboundBusErrors = inboundBusWithInputValidation . pipe (
116
+ ignoreElements ( ) ,
117
+ takeUntil ( onErrorResumeNext ( output . pipe ( last ( ) ) ) ) ,
118
+ ) ;
119
+
120
+ // Return the builder output plus any input errors.
121
+ return merge ( inboundBusErrors , output ) ;
98
122
}
99
123
100
124
return of ( Object . assign ( handler , { jobDescription } ) as BuilderJobHandler ) ;
@@ -281,9 +305,9 @@ function _validateOptionsFactory(host: ArchitectHost, registry: json.schema.Sche
281
305
switchMap ( ( { data, success, errors } ) => {
282
306
if ( success ) {
283
307
return of ( data as json . JsonObject ) ;
284
- } else {
285
- throw new json . schema . SchemaValidationException ( errors ) ;
286
308
}
309
+
310
+ throw new json . schema . SchemaValidationException ( errors ) ;
287
311
} ) ,
288
312
) . toPromise ( ) ;
289
313
} ,
0 commit comments