Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit 871f6d3

Browse files
repeatWhen
1 parent 5290a1b commit 871f6d3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+3007
-673
lines changed

Gruntfile.js

+6
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ module.exports = function (grunt) {
233233
'src/core/linq/observable/repeatproto.js',
234234
'src/core/linq/observable/retry.js',
235235
'src/core/linq/observable/retrywhen.js',
236+
'src/core/linq/observable/repeatwhen.js',
236237
'src/core/perf/operators/scan.js',
237238
'src/core/linq/observable/skiplast.js',
238239
'src/core/linq/observable/startwith.js',
@@ -528,6 +529,7 @@ module.exports = function (grunt) {
528529
'src/core/linq/observable/repeatproto.js',
529530
'src/core/linq/observable/retry.js',
530531
'src/core/linq/observable/retrywhen.js',
532+
'src/core/linq/observable/repeatwhen.js',
531533
'src/core/perf/operators/scan.js',
532534
'src/core/linq/observable/skiplast.js',
533535
'src/core/linq/observable/startwith.js',
@@ -823,6 +825,7 @@ module.exports = function (grunt) {
823825
'src/core/linq/observable/repeatproto.js',
824826
'src/core/linq/observable/retry.js',
825827
'src/core/linq/observable/retrywhen.js',
828+
'src/core/linq/observable/repeatwhen.js',
826829
'src/core/perf/operators/scan.js',
827830
'src/core/linq/observable/skiplast.js',
828831
'src/core/linq/observable/startwith.js',
@@ -973,6 +976,7 @@ module.exports = function (grunt) {
973976
'src/core/linq/observable/repeatproto.js',
974977
'src/core/linq/observable/retry.js',
975978
'src/core/linq/observable/retrywhen.js',
979+
'src/core/linq/observable/repeatwhen.js',
976980
'src/core/perf/operators/scan.js',
977981
'src/core/linq/observable/skiplast.js',
978982
'src/core/linq/observable/startwith.js',
@@ -1104,6 +1108,7 @@ module.exports = function (grunt) {
11041108
'src/core/linq/observable/repeatproto.js',
11051109
'src/core/linq/observable/retry.js',
11061110
'src/core/linq/observable/retrywhen.js',
1111+
'src/core/linq/observable/repeatwhen.js',
11071112
'src/core/perf/operators/scan.js',
11081113
'src/core/linq/observable/skiplast.js',
11091114
'src/core/linq/observable/startwith.js',
@@ -1275,6 +1280,7 @@ module.exports = function (grunt) {
12751280
'src/core/linq/observable/repeatproto.js',
12761281
'src/core/linq/observable/retry.js',
12771282
'src/core/linq/observable/retrywhen.js',
1283+
'src/core/linq/observable/repeatwhen.js',
12781284
'src/core/perf/operators/scan.js',
12791285
'src/core/linq/observable/skiplast.js',
12801286
'src/core/linq/observable/startwith.js',

dist/rx.all.compat.js

+177-65
Original file line numberDiff line numberDiff line change
@@ -2617,59 +2617,6 @@ var FlatMapObservable = Rx.FlatMapObservable = (function(__super__) {
26172617
return new CatchErrorObservable(this);
26182618
};
26192619

2620-
Enumerable.prototype.catchErrorWhen = function (notificationHandler) {
2621-
var sources = this;
2622-
return new AnonymousObservable(function (o) {
2623-
var exceptions = new Subject(),
2624-
notifier = new Subject(),
2625-
handled = notificationHandler(exceptions),
2626-
notificationDisposable = handled.subscribe(notifier);
2627-
2628-
var e = sources[$iterator$]();
2629-
2630-
var state = { isDisposed: false },
2631-
lastError,
2632-
subscription = new SerialDisposable();
2633-
var cancelable = currentThreadScheduler.scheduleRecursive(null, function (_, self) {
2634-
if (state.isDisposed) { return; }
2635-
var currentItem = tryCatch(e.next).call(e);
2636-
if (currentItem === errorObj) { return o.onError(currentItem.e); }
2637-
2638-
if (currentItem.done) {
2639-
if (lastError) {
2640-
o.onError(lastError);
2641-
} else {
2642-
o.onCompleted();
2643-
}
2644-
return;
2645-
}
2646-
2647-
// Check if promise
2648-
var currentValue = currentItem.value;
2649-
isPromise(currentValue) && (currentValue = observableFromPromise(currentValue));
2650-
2651-
var outer = new SingleAssignmentDisposable();
2652-
var inner = new SingleAssignmentDisposable();
2653-
subscription.setDisposable(new BinaryDisposable(inner, outer));
2654-
outer.setDisposable(currentValue.subscribe(
2655-
function(x) { o.onNext(x); },
2656-
function (exn) {
2657-
inner.setDisposable(notifier.subscribe(self, function(ex) {
2658-
o.onError(ex);
2659-
}, function() {
2660-
o.onCompleted();
2661-
}));
2662-
2663-
exceptions.onNext(exn);
2664-
outer.dispose();
2665-
},
2666-
function() { o.onCompleted(); }));
2667-
});
2668-
2669-
return new NAryDisposable([notificationDisposable, subscription, cancelable, new IsDisposedDisposable(state)]);
2670-
});
2671-
};
2672-
26732620
var RepeatEnumerable = (function (__super__) {
26742621
inherits(RepeatEnumerable, __super__);
26752622
function RepeatEnumerable(v, c) {
@@ -5124,19 +5071,184 @@ observableProto.zipIterable = function () {
51245071
return enumerableRepeat(this, retryCount).catchError();
51255072
};
51265073

5127-
/**
5128-
* Repeats the source observable sequence upon error each time the notifier emits or until it successfully terminates.
5129-
* if the notifier completes, the observable sequence completes.
5130-
*
5131-
* @example
5132-
* var timer = Observable.timer(500);
5133-
* var source = observable.retryWhen(timer);
5134-
* @param {Observable} [notifier] An observable that triggers the retries or completes the observable with onNext or onCompleted respectively.
5135-
* @returns {Observable} An observable sequence producing the elements of the given sequence repeatedly until it terminates successfully.
5136-
*/
5074+
function repeat(value) {
5075+
return {
5076+
'@@iterator': function () {
5077+
return {
5078+
next: function () {
5079+
return { done: false, value: value };
5080+
}
5081+
};
5082+
}
5083+
};
5084+
}
5085+
5086+
var RetryWhenObservable = (function(__super__) {
5087+
function createDisposable(state) {
5088+
return {
5089+
isDisposed: false,
5090+
dispose: function () {
5091+
if (!this.isDisposed) {
5092+
this.isDisposed = true;
5093+
state.isDisposed = true;
5094+
}
5095+
}
5096+
};
5097+
}
5098+
5099+
function RetryWhenObservable(source, notifier) {
5100+
this.source = source;
5101+
this._notifier = notifier;
5102+
__super__.call(this);
5103+
}
5104+
5105+
inherits(RetryWhenObservable, __super__);
5106+
5107+
RetryWhenObservable.prototype.subscribeCore = function (o) {
5108+
var exceptions = new Subject(),
5109+
notifier = new Subject(),
5110+
handled = this._notifier(exceptions),
5111+
notificationDisposable = handled.subscribe(notifier);
5112+
5113+
var e = this.source['@@iterator']();
5114+
5115+
var state = { isDisposed: false },
5116+
lastError,
5117+
subscription = new SerialDisposable();
5118+
var cancelable = currentThreadScheduler.scheduleRecursive(null, function (_, recurse) {
5119+
if (state.isDisposed) { return; }
5120+
var currentItem = e.next();
5121+
5122+
if (currentItem.done) {
5123+
if (lastError) {
5124+
o.onError(lastError);
5125+
} else {
5126+
o.onCompleted();
5127+
}
5128+
return;
5129+
}
5130+
5131+
// Check if promise
5132+
var currentValue = currentItem.value;
5133+
isPromise(currentValue) && (currentValue = observableFromPromise(currentValue));
5134+
5135+
var outer = new SingleAssignmentDisposable();
5136+
var inner = new SingleAssignmentDisposable();
5137+
subscription.setDisposable(new BinaryDisposable(inner, outer));
5138+
outer.setDisposable(currentValue.subscribe(
5139+
function(x) { o.onNext(x); },
5140+
function (exn) {
5141+
inner.setDisposable(notifier.subscribe(recurse, function(ex) {
5142+
o.onError(ex);
5143+
}, function() {
5144+
o.onCompleted();
5145+
}));
5146+
5147+
exceptions.onNext(exn);
5148+
outer.dispose();
5149+
},
5150+
function() { o.onCompleted(); }));
5151+
});
5152+
5153+
return new NAryDisposable([notificationDisposable, subscription, cancelable, createDisposable(state)]);
5154+
};
5155+
5156+
return RetryWhenObservable;
5157+
}(ObservableBase));
5158+
51375159
observableProto.retryWhen = function (notifier) {
5138-
return enumerableRepeat(this).catchErrorWhen(notifier);
5160+
return new RetryWhenObservable(repeat(this), notifier);
51395161
};
5162+
5163+
function repeat(value) {
5164+
return {
5165+
'@@iterator': function () {
5166+
return {
5167+
next: function () {
5168+
return { done: false, value: value };
5169+
}
5170+
};
5171+
}
5172+
};
5173+
}
5174+
5175+
var RepeatWhenObservable = (function(__super__) {
5176+
function createDisposable(state) {
5177+
return {
5178+
isDisposed: false,
5179+
dispose: function () {
5180+
if (!this.isDisposed) {
5181+
this.isDisposed = true;
5182+
state.isDisposed = true;
5183+
}
5184+
}
5185+
};
5186+
}
5187+
5188+
function RepeatWhenObservable(source, notifier) {
5189+
this.source = source;
5190+
this._notifier = notifier;
5191+
__super__.call(this);
5192+
}
5193+
5194+
inherits(RepeatWhenObservable, __super__);
5195+
5196+
RepeatWhenObservable.prototype.subscribeCore = function (o) {
5197+
var completions = new Subject(),
5198+
notifier = new Subject(),
5199+
handled = this._notifier(completions),
5200+
notificationDisposable = handled.subscribe(notifier);
5201+
5202+
var e = this.source['@@iterator']();
5203+
5204+
var state = { isDisposed: false },
5205+
lastError,
5206+
subscription = new SerialDisposable();
5207+
var cancelable = currentThreadScheduler.scheduleRecursive(null, function (_, recurse) {
5208+
if (state.isDisposed) { return; }
5209+
var currentItem = e.next();
5210+
5211+
if (currentItem.done) {
5212+
if (lastError) {
5213+
o.onError(lastError);
5214+
} else {
5215+
o.onCompleted();
5216+
}
5217+
return;
5218+
}
5219+
5220+
// Check if promise
5221+
var currentValue = currentItem.value;
5222+
isPromise(currentValue) && (currentValue = observableFromPromise(currentValue));
5223+
5224+
var outer = new SingleAssignmentDisposable();
5225+
var inner = new SingleAssignmentDisposable();
5226+
subscription.setDisposable(new BinaryDisposable(inner, outer));
5227+
outer.setDisposable(currentValue.subscribe(
5228+
function(x) { o.onNext(x); },
5229+
function (exn) { o.onError(exn); },
5230+
function() {
5231+
inner.setDisposable(notifier.subscribe(recurse, function(ex) {
5232+
o.onError(ex);
5233+
}, function() {
5234+
o.onCompleted();
5235+
}));
5236+
5237+
completions.onNext(null);
5238+
outer.dispose();
5239+
}));
5240+
});
5241+
5242+
return new NAryDisposable([notificationDisposable, subscription, cancelable, createDisposable(state)]);
5243+
};
5244+
5245+
return RepeatWhenObservable;
5246+
}(ObservableBase));
5247+
5248+
observableProto.repeatWhen = function (notifier) {
5249+
return new RepeatWhenObservable(repeat(this), notifier);
5250+
};
5251+
51405252
var ScanObservable = (function(__super__) {
51415253
inherits(ScanObservable, __super__);
51425254
function ScanObservable(source, accumulator, hasSeed, seed) {
@@ -9869,7 +9981,7 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
98699981
}
98709982

98719983
return new BinaryDisposable(subscription, delays);
9872-
}, this);
9984+
}, source);
98739985
}
98749986

98759987
/**

dist/rx.all.compat.map

+1-1
Large diffs are not rendered by default.

dist/rx.all.compat.min.js

+5-5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)