Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit c853c25

Browse files
toAsync
1 parent c37f4cc commit c853c25

File tree

2 files changed

+171
-0
lines changed

2 files changed

+171
-0
lines changed

src/modular/observable/toasync.js

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
'use strict';
2+
3+
var AsyncSubject = require('../asyncsubject');
4+
var asObservable = require('./asobservable');
5+
var isScheduler = require('../scheduler').isScheduler;
6+
var tryCatch = require('../internal/trycatchutils').tryCatch;
7+
8+
global._Rx || (global._Rx = {});
9+
if (!global._Rx.defaultScheduler) {
10+
require('../scheduler/defaultscheduler');
11+
}
12+
13+
function scheduleMethod(s, state) {
14+
var result = tryCatch(state.func).apply(state.context, state.args);
15+
if (result === global._Rx.errorObj) { return state.subject.onError(result.e); }
16+
state.subject.onNext(result);
17+
state.subject.onCompleted();
18+
}
19+
20+
module.exports = function toAsync(func, context, scheduler) {
21+
isScheduler(scheduler) || (scheduler = global._Rx.defaultScheduler);
22+
return function asyncFn () {
23+
var subject = new AsyncSubject(), len = arguments.length, args = new Array(len);
24+
for (var i = 0; i < len; i++) { args[i] = arguments[i]; }
25+
var state = {
26+
subject: subject,
27+
args: args,
28+
func: func,
29+
context: context
30+
};
31+
32+
scheduler.schedule(state, scheduleMethod);
33+
return asObservable(subject);
34+
};
35+
};

src/modular/test/toasync.js

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
'use strict';
2+
3+
var test = require('tape');
4+
var Observable = require('../observable');
5+
var TestScheduler = require('../testing/testscheduler');
6+
var reactiveAssert = require('../testing/reactiveassert');
7+
var ReactiveTest = require('../testing/reactivetest');
8+
var onNext = ReactiveTest.onNext,
9+
onError = ReactiveTest.onError,
10+
onCompleted = ReactiveTest.onCompleted;
11+
12+
Observable.addToObject({
13+
toAsync: require('../observable/toasync')
14+
});
15+
16+
test('Observable.toAsync context', function (t) {
17+
var context = { value: 42 };
18+
19+
var scheduler = new TestScheduler();
20+
21+
var results = scheduler.startScheduler(function () {
22+
return Observable.toAsync(function (x) {
23+
return this.value + x;
24+
}, context, scheduler)(42);
25+
});
26+
27+
reactiveAssert(t, results.messages, [
28+
onNext(200, 84),
29+
onCompleted(200)
30+
]);
31+
32+
t.end();
33+
});
34+
35+
test('Observable.toAsync 0', function (t) {
36+
var scheduler = new TestScheduler();
37+
38+
var results = scheduler.startScheduler(function () {
39+
return Observable.toAsync(function () {
40+
return 0;
41+
}, null, scheduler)();
42+
});
43+
44+
reactiveAssert(t, results.messages, [
45+
onNext(200, 0),
46+
onCompleted(200)
47+
]);
48+
49+
t.end();
50+
});
51+
52+
test('Observable.toAsync 1', function (t) {
53+
var scheduler = new TestScheduler();
54+
55+
var results = scheduler.startScheduler(function () {
56+
return Observable.toAsync(function (x) {
57+
return x;
58+
}, null, scheduler)(1);
59+
});
60+
61+
reactiveAssert(t, results.messages, [
62+
onNext(200, 1),
63+
onCompleted(200)
64+
]);
65+
66+
t.end();
67+
});
68+
69+
test('Observable.toAsync 2', function (t) {
70+
var scheduler = new TestScheduler();
71+
72+
var results = scheduler.startScheduler(function () {
73+
return Observable.toAsync(function (x, y) {
74+
return x + y;
75+
}, null, scheduler)(1, 2);
76+
});
77+
78+
reactiveAssert(t, results.messages, [
79+
onNext(200, 3),
80+
onCompleted(200)
81+
]);
82+
83+
t.end();
84+
});
85+
86+
test('Observable.toAsync 3', function (t) {
87+
var scheduler = new TestScheduler();
88+
89+
var results = scheduler.startScheduler(function () {
90+
return Observable.toAsync(function (x, y, z) {
91+
return x + y + z;
92+
}, null, scheduler)(1, 2, 3);
93+
});
94+
95+
reactiveAssert(t, results.messages, [
96+
onNext(200, 6),
97+
onCompleted(200)
98+
]);
99+
100+
t.end();
101+
});
102+
103+
test('Observable.toAsync 4', function (t) {
104+
var scheduler = new TestScheduler();
105+
106+
var results = scheduler.startScheduler(function () {
107+
return Observable.toAsync(function (a, b, c, d) {
108+
return a + b + c + d;
109+
}, null, scheduler)(1, 2, 3, 4);
110+
});
111+
112+
reactiveAssert(t, results.messages, [
113+
onNext(200, 10),
114+
onCompleted(200)
115+
]);
116+
117+
t.end();
118+
});
119+
120+
test('Observable.toAsync error', function (t) {
121+
var error = new Error();
122+
123+
var scheduler = new TestScheduler();
124+
125+
var results = scheduler.startScheduler(function () {
126+
return Observable.toAsync(function () {
127+
throw error;
128+
}, null, scheduler)();
129+
});
130+
131+
reactiveAssert(t, results.messages, [
132+
onError(200, error)
133+
]);
134+
135+
t.end();
136+
});

0 commit comments

Comments
 (0)