This repository was archived by the owner on Apr 20, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathgroupbyuntil.js
86 lines (77 loc) · 3.73 KB
/
groupbyuntil.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/**
* Groups the elements of an observable sequence according to a specified key selector function.
* A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
* key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
*
* @example
* var res = observable.groupByUntil(function (x) { return x.id; }, null, function () { return Rx.Observable.never(); });
* 2 - observable.groupBy(function (x) { return x.id; }), function (x) { return x.name; }, function () { return Rx.Observable.never(); });
* 3 - observable.groupBy(function (x) { return x.id; }), function (x) { return x.name; }, function () { return Rx.Observable.never(); }, function (x) { return x.toString(); });
* @param {Function} keySelector A function to extract the key for each element.
* @param {Function} durationSelector A function to signal the expiration of a group.
* @returns {Observable}
* A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
* If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
*
*/
observableProto.groupByUntil = function (keySelector, elementSelector, durationSelector) {
var source = this;
return new AnonymousObservable(function (o) {
var map = new Map(),
groupDisposable = new CompositeDisposable(),
refCountDisposable = new RefCountDisposable(groupDisposable),
handleError = function (e) { return function (item) { item.onError(e); }; };
groupDisposable.add(
source.subscribe(function (x) {
var key = tryCatch(keySelector)(x);
if (key === errorObj) {
map.forEach(handleError(key.e));
return o.onError(key.e);
}
var fireNewMapEntry = false, writer = map.get(key);
if (writer === undefined) {
writer = new Subject();
map.set(key, writer);
fireNewMapEntry = true;
}
if (fireNewMapEntry) {
var group = new GroupedObservable(key, writer, refCountDisposable),
durationGroup = new GroupedObservable(key, writer);
var duration = tryCatch(durationSelector)(durationGroup);
if (duration === errorObj) {
map.forEach(handleError(duration.e));
return o.onError(duration.e);
}
o.onNext(group);
var md = new SingleAssignmentDisposable();
groupDisposable.add(md);
md.setDisposable(duration.take(1).subscribe(
noop,
function (e) {
map.forEach(handleError(e));
o.onError(e);
},
function () {
if (map['delete'](key)) { writer.onCompleted(); }
groupDisposable.remove(md);
}));
}
var element = x;
if (isFunction(elementSelector)) {
element = tryCatch(elementSelector)(x);
if (element === errorObj) {
map.forEach(handleError(element.e));
return o.onError(element.e);
}
}
writer.onNext(element);
}, function (e) {
map.forEach(handleError(e));
o.onError(e);
}, function () {
map.forEach(function (item) { item.onCompleted(); });
o.onCompleted();
}));
return refCountDisposable;
}, source);
};