This repository was archived by the owner on Apr 20, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathconcat.js
68 lines (58 loc) · 2.3 KB
/
concat.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
var ConcatObserver = (function(__super__) {
inherits(ConcatObserver, __super__);
function ConcatObserver(s, fn) {
this._s = s;
this._fn = fn;
__super__.call(this);
}
ConcatObserver.prototype.next = function (x) { this._s.o.onNext(x); };
ConcatObserver.prototype.error = function (e) { this._s.o.onError(e); };
ConcatObserver.prototype.completed = function () { this._s.i++; this._fn(this._s); };
return ConcatObserver;
}(AbstractObserver));
var ConcatObservable = (function(__super__) {
inherits(ConcatObservable, __super__);
function ConcatObservable(sources) {
this._sources = sources;
__super__.call(this);
}
function scheduleRecursive (state, recurse) {
if (state.disposable.isDisposed) { return; }
if (state.i === state.sources.length) { return state.o.onCompleted(); }
// Check if promise
var currentValue = state.sources[state.i];
isPromise(currentValue) && (currentValue = observableFromPromise(currentValue));
var d = new SingleAssignmentDisposable();
state.subscription.setDisposable(d);
d.setDisposable(currentValue.subscribe(new ConcatObserver(state, recurse)));
}
ConcatObservable.prototype.subscribeCore = function(o) {
var subscription = new SerialDisposable();
var disposable = disposableCreate(noop);
var state = {
o: o,
i: 0,
subscription: subscription,
disposable: disposable,
sources: this._sources
};
var cancelable = immediateScheduler.scheduleRecursive(state, scheduleRecursive);
return new NAryDisposable([subscription, disposable, cancelable]);
};
return ConcatObservable;
}(ObservableBase));
/**
* Concatenates all the observable sequences.
* @param {Array | Arguments} args Arguments or an array to concat to the observable sequence.
* @returns {Observable} An observable sequence that contains the elements of each given sequence, in sequential order.
*/
var observableConcat = Observable.concat = function () {
var args;
if (Array.isArray(arguments[0])) {
args = arguments[0];
} else {
args = new Array(arguments.length);
for(var i = 0, len = arguments.length; i < len; i++) { args[i] = arguments[i]; }
}
return new ConcatObservable(args);
};