-
Notifications
You must be signed in to change notification settings - Fork 105
/
Copy pathSettler.cs
81 lines (71 loc) · 3.38 KB
/
Settler.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
using System;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
using static System.Reactive.Linq.Observable;
namespace OmniSharp.Extensions.JsonRpc.Testing
{
public class Settler : ISettler, IRequestSettler, IDisposable
{
private readonly JsonRpcTestOptions _options;
private readonly CancellationToken _cancellationToken;
private readonly IScheduler _scheduler;
private readonly IObservable<Unit> _settle;
private readonly IObserver<int> _requester;
private readonly IDisposable _connectable;
private readonly IObservable<Unit> _timeoutValue;
public Settler(JsonRpcTestOptions options, CancellationToken cancellationToken, IScheduler scheduler = null)
{
_options = options;
_cancellationToken = cancellationToken;
scheduler ??= Scheduler.Immediate;
_scheduler = scheduler;
_timeoutValue = Return(Unit.Default, _scheduler);
var subject = new Subject<int>();
var data = subject;
var connectable = data
.StartWith(0)
.Scan(
0, (acc, next) => {
acc += next;
return acc;
}
)
.DistinctUntilChanged()
.Select(
z => {
if (z > 0)
{
return Never<Unit>();
// return Timer(_options.Timeout, _scheduler)
// .Select(z => Unit.Default);
}
return Timer(_options.WaitTime, _scheduler)
.Select(z => Unit.Default);
}
)
.Replay(1, _scheduler);
_connectable = connectable.Connect();
_settle = connectable
.Select(o => o.Timeout(_options.Timeout, _scheduler))
.Switch();
_requester = subject.AsObserver();
}
public Task SettleNext() => SettleNextInternal().ToTask(_cancellationToken);
public IObservable<Unit> SettleNextInternal() => _settle
.Catch<Unit, Exception>(_ => _timeoutValue)
.Take(1)
.IgnoreElements()
.LastOrDefaultAsync();
public IObservable<Unit> Settle() => _settle
.Timeout(_options.Timeout, _scheduler)
.Catch<Unit, Exception>(_ => _timeoutValue);
void IRequestSettler.OnStartRequest() => _requester.OnNext(1);
void IRequestSettler.OnEndRequest() => _requester.OnNext(-1);
public void Dispose() => _connectable?.Dispose();
}
}