Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit c37f4cc

Browse files
repeat
1 parent 6d42171 commit c37f4cc

File tree

7 files changed

+1559
-3
lines changed

7 files changed

+1559
-3
lines changed

src/modular/foo.js

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
var $iterator$ = '@@iterator';
2+
3+
function repeatValue(value, count) {
4+
count == null && (count = -1);
5+
return {
6+
'@@iterator': function () {
7+
return {
8+
remaining: count,
9+
next: function () {
10+
if (this.remaining === 0) { return { done: true, value: undefined }; }
11+
if (this.remaining > 0) { this.remaining--; }
12+
return { done: false, value: value };
13+
}
14+
};
15+
}
16+
};
17+
}
18+
19+
var e = repeatValue(42, 10)[$iterator$]();
20+
var result;
21+
do {
22+
result = e.next();
23+
console.log(result.value);
24+
} while (!result.done);
25+
26+
function createDisposable(state) {
27+
return {
28+
isDisposed: false,
29+
dispose: function () {
30+
if (!this.isDisposed) {
31+
this.isDisposed = true;
32+
state.isDisposed = true;
33+
}
34+
}
35+
};
36+
}
37+
38+
var o = { isDisposed: false };
39+
var d = createDisposable(o);
40+
console.log(o.isDisposed);
41+
d.dispose();
42+
console.log(o.isDisposed);
43+
d.dispose();
44+
console.log(o.isDisposed);

src/modular/observable/repeat.js

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
'use strict';
2+
3+
var ObservableBase = require('./observablebase');
4+
var fromPromise = require('./frompromise');
5+
var isPromise = require('../helpers/ispromise');
6+
var AbstractObserver = require('../observer/abstractobserver');
7+
var NAryDisposable = require('../narydisposable');
8+
var SerialDisposable = require('../serialdisposable');
9+
var SingleAssignmentDisposable = require('../singleassignmentdisposable');
10+
var inherits = require('inherits');
11+
12+
global._Rx || (global._Rx = {});
13+
if (!global._Rx.currentThreadScheduler) {
14+
require('../scheduler/currentthreadscheduler');
15+
}
16+
17+
var $iterator$ = '@@iterator';
18+
19+
function repeatValue(value, count) {
20+
count == null && (count = -1);
21+
return {
22+
'@@iterator': function () {
23+
return {
24+
remaining: count,
25+
next: function () {
26+
if (this.remaining === 0) { return { done: true, value: undefined }; }
27+
if (this.remaining > 0) { this.remaining--; }
28+
return { done: false, value: value };
29+
}
30+
};
31+
}
32+
};
33+
}
34+
35+
function createDisposable(state) {
36+
return {
37+
isDisposed: false,
38+
dispose: function () {
39+
if (!this.isDisposed) {
40+
this.isDisposed = true;
41+
state.isDisposed = true;
42+
}
43+
}
44+
};
45+
}
46+
47+
function ConcatObserver(state, recurse) {
48+
this._state = state;
49+
this._recurse = recurse;
50+
AbstractObserver.call(this);
51+
}
52+
53+
inherits(ConcatObserver, AbstractObserver);
54+
55+
ConcatObserver.prototype.next = function (x) { this._state.o.onNext(x); };
56+
ConcatObserver.prototype.error = function (e) { this._state.o.onError(e); };
57+
ConcatObserver.prototype.completed = function () { this._recurse(this._state); };
58+
59+
function ConcatObservable(sources) {
60+
this.sources = sources;
61+
ObservableBase.call(this);
62+
}
63+
64+
inherits(ConcatObservable, ObservableBase);
65+
66+
function scheduleMethod(state, recurse) {
67+
if (state.isDisposed) { return; }
68+
var currentItem = state.e.next();
69+
if (currentItem.done) { return state.o.onCompleted(); }
70+
71+
// Check if promise
72+
var currentValue = currentItem.value;
73+
isPromise(currentValue) && (currentValue = fromPromise(currentValue));
74+
75+
var d = new SingleAssignmentDisposable();
76+
state.subscription.setDisposable(d);
77+
d.setDisposable(currentValue.subscribe(new ConcatObserver(state, recurse)));
78+
}
79+
80+
ConcatObservable.prototype.subscribeCore = function (o) {
81+
var subscription = new SerialDisposable();
82+
var state = {
83+
isDisposed: false,
84+
o: o,
85+
subscription: subscription,
86+
e: this.sources[$iterator$]()
87+
};
88+
89+
var cancelable = global._Rx.currentThreadScheduler.scheduleRecursive(state, scheduleMethod);
90+
return new NAryDisposable([subscription, cancelable, createDisposable(state)]);
91+
};
92+
93+
module.exports = function repeat(source, repeatCount) {
94+
return new ConcatObservable(repeatValue(source, repeatCount));
95+
};

src/modular/observable/repeatvalue.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
'use strict';
2+
3+
var just = require('./just');
4+
var repeat = require('./repeat');
5+
var isScheduler = require('../scheduler').isScheduler;
6+
7+
global._Rx || (global._Rx = {});
8+
if (!global._Rx.currentThreadScheduler) {
9+
require('../scheduler/currentthreadscheduler');
10+
}
11+
12+
module.exports = function repeatValue(value, repeatCount, scheduler) {
13+
isScheduler(scheduler) || (scheduler = global._Rx.currentThreadScheduler);
14+
return repeat(just(value, scheduler), repeatCount);
15+
};

src/modular/observable/replay.js

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
'use strict';
2+
3+
var multicast = require('./multicast');
4+
var ReplaySubject = require('../replaysubject');
5+
var isFunction = require('../helpers/isfunction');
6+
7+
module.exports = function replay (source, selector, bufferSize, windowSize, scheduler) {
8+
return isFunction(selector) ?
9+
multicast(source, function () { return new ReplaySubject(bufferSize, windowSize, scheduler); }, selector) :
10+
multicast(source, new ReplaySubject(bufferSize, windowSize, scheduler));
11+
};

src/modular/observable/retry.js

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ var AbstractObserver = require('../observer/abstractobserver');
77
var NAryDisposable = require('../narydisposable');
88
var SerialDisposable = require('../serialdisposable');
99
var SingleAssignmentDisposable = require('../singleassignmentdisposable');
10-
var tryCatch = require('../internal/trycatchutils').tryCatch;
1110
var inherits = require('inherits');
1211

1312
global._Rx || (global._Rx = {});
@@ -18,6 +17,7 @@ if (!global._Rx.currentThreadScheduler) {
1817
var $iterator$ = '@@iterator';
1918

2019
function repeat(value, count) {
20+
count == null && (count = -1);
2121
return {
2222
'@@iterator': function () {
2323
return {
@@ -65,8 +65,7 @@ inherits(CatchErrorObservable, ObservableBase);
6565

6666
function scheduleMethod(state, recurse) {
6767
if (state.isDisposed) { return; }
68-
var currentItem = tryCatch(state.e.next).call(state.e);
69-
if (currentItem === global._Rx.errorObj) { return state.o.onError(currentItem.e); }
68+
var currentItem = state.e.next();
7069
if (currentItem.done) { return state.lastError !== null ? state.o.onError(state.lastError) : state.o.onCompleted(); }
7170

7271
var currentValue = currentItem.value;

0 commit comments

Comments
 (0)