-
Notifications
You must be signed in to change notification settings - Fork 105
/
Copy pathAggregateSettler.cs
31 lines (27 loc) · 1.01 KB
/
AggregateSettler.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
using System;
using System.Linq;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading.Tasks;
namespace OmniSharp.Extensions.JsonRpc.Testing
{
public class AggregateSettler : ISettler
{
private readonly ISettler[] _settlers;
public AggregateSettler(params ISettler[] settlers) => _settlers = settlers;
public Task SettleNext() => Task.WhenAll(_settlers.Select(z => z.SettleNext()));
public IObservable<Unit> Settle() =>
_settlers
.Select((settler, index) => settler.Settle().Select((_, value) => new { index, value }))
.CombineLatest()
.Scan(
0, (value, result) => {
var maxValue = result.Max(z => z.value);
return result.All(z => z.value == maxValue) ? maxValue : value;
}
)
.DistinctUntilChanged()
.Select(z => Unit.Default);
}
}