1
1
using System ;
2
2
using System . Reactive ;
3
3
using System . Reactive . Concurrency ;
4
+ using System . Reactive . Linq ;
4
5
using System . Reactive . Subjects ;
5
6
using System . Reactive . Threading . Tasks ;
6
7
using System . Threading ;
@@ -11,21 +12,21 @@ namespace OmniSharp.Extensions.JsonRpc.Testing
11
12
{
12
13
public class Settler : ISettler , IRequestSettler , IDisposable
13
14
{
14
- private readonly TimeSpan _timeout ;
15
+ private readonly JsonRpcTestOptions _options ;
15
16
private readonly CancellationToken _cancellationToken ;
16
17
private readonly IScheduler _scheduler ;
17
18
private readonly IObservable < Unit > _settle ;
18
19
private readonly IObserver < int > _requester ;
19
20
private readonly IDisposable _connectable ;
20
- private readonly IObservable < Unit > _defaultValue ;
21
+ private readonly IObservable < Unit > _timeoutValue ;
21
22
22
- public Settler ( TimeSpan waitTime , TimeSpan timeout , CancellationToken cancellationToken , IScheduler scheduler = null )
23
+ public Settler ( JsonRpcTestOptions options , CancellationToken cancellationToken , IScheduler scheduler = null )
23
24
{
24
- _timeout = timeout ;
25
+ _options = options ;
25
26
_cancellationToken = cancellationToken ;
26
27
scheduler ??= Scheduler . Immediate ;
27
28
_scheduler = scheduler ;
28
- _defaultValue = Return ( Unit . Default , _scheduler ) ;
29
+ _timeoutValue = Return ( Unit . Default , _scheduler ) ;
29
30
var subject = new Subject < int > ( ) ;
30
31
var data = subject ;
31
32
@@ -42,24 +43,34 @@ public Settler(TimeSpan waitTime, TimeSpan timeout, CancellationToken cancellati
42
43
z => {
43
44
if ( z > 0 )
44
45
{
45
- return Timer ( _timeout , _scheduler )
46
- . Select ( z => Unit . Default ) ;
46
+ return Never < Unit > ( ) ;
47
+ // return Timer(_options.Timeout, _scheduler)
48
+ // .Select(z => Unit.Default);
47
49
}
48
50
49
- return Amb ( Timer ( waitTime , _scheduler ) , Timer ( _timeout , _scheduler ) )
51
+ return Timer ( _options . WaitTime , _scheduler )
50
52
. Select ( z => Unit . Default ) ;
51
53
}
52
54
)
53
55
. Replay ( 1 , _scheduler ) ;
54
56
_connectable = connectable . Connect ( ) ;
55
57
_settle = connectable
56
- . Switch ( ) ;
58
+ . Select ( o => o . Timeout ( _options . Timeout , _scheduler ) )
59
+ . Switch ( ) ;
57
60
_requester = subject . AsObserver ( ) ;
58
61
}
59
62
60
- public Task SettleNext ( ) => _settle . Take ( 1 ) . IgnoreElements ( ) . LastOrDefaultAsync ( ) . ToTask ( _cancellationToken ) ;
63
+ public Task SettleNext ( ) => SettleNextInternal ( ) . ToTask ( _cancellationToken ) ;
61
64
62
- public IObservable < Unit > Settle ( ) => _settle . Timeout ( _timeout , _scheduler ) . Catch < Unit , Exception > ( _ => _defaultValue ) ;
65
+ public IObservable < Unit > SettleNextInternal ( ) => _settle
66
+ . Catch < Unit , Exception > ( _ => _timeoutValue )
67
+ . Take ( 1 )
68
+ . IgnoreElements ( )
69
+ . LastOrDefaultAsync ( ) ;
70
+
71
+ public IObservable < Unit > Settle ( ) => _settle
72
+ . Timeout ( _options . Timeout , _scheduler )
73
+ . Catch < Unit , Exception > ( _ => _timeoutValue ) ;
63
74
64
75
void IRequestSettler . OnStartRequest ( ) => _requester . OnNext ( 1 ) ;
65
76
0 commit comments