This repository was archived by the owner on Apr 20, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathwindowwithtimeorcount.js
60 lines (57 loc) · 2.13 KB
/
windowwithtimeorcount.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* Projects each element of an observable sequence into a window that is completed when either it's full or a given amount of time has elapsed.
* @param {Number} timeSpan Maximum time length of a window.
* @param {Number} count Maximum element count of a window.
* @param {Scheduler} [scheduler] Scheduler to run windowing timers on. If not specified, the timeout scheduler is used.
* @returns {Observable} An observable sequence of windows.
*/
observableProto.windowWithTimeOrCount = observableProto.windowTimeOrCount = function (timeSpan, count, scheduler) {
var source = this;
isScheduler(scheduler) || (scheduler = defaultScheduler);
return new AnonymousObservable(function (observer) {
var timerD = new SerialDisposable(),
groupDisposable = new CompositeDisposable(timerD),
refCountDisposable = new RefCountDisposable(groupDisposable),
n = 0,
windowId = 0,
s = new Subject();
function createTimer(id) {
var m = new SingleAssignmentDisposable();
timerD.setDisposable(m);
m.setDisposable(scheduler.scheduleFuture(null, timeSpan, function () {
if (id !== windowId) { return; }
n = 0;
var newId = ++windowId;
s.onCompleted();
s = new Subject();
observer.onNext(addRef(s, refCountDisposable));
createTimer(newId);
}));
}
observer.onNext(addRef(s, refCountDisposable));
createTimer(0);
groupDisposable.add(source.subscribe(
function (x) {
var newId = 0, newWindow = false;
s.onNext(x);
if (++n === count) {
newWindow = true;
n = 0;
newId = ++windowId;
s.onCompleted();
s = new Subject();
observer.onNext(addRef(s, refCountDisposable));
}
newWindow && createTimer(newId);
},
function (e) {
s.onError(e);
observer.onError(e);
}, function () {
s.onCompleted();
observer.onCompleted();
}
));
return refCountDisposable;
}, source);
};