-
Notifications
You must be signed in to change notification settings - Fork 105
/
Copy pathClientProgressManager.cs
65 lines (55 loc) · 2.45 KB
/
ClientProgressManager.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
using MediatR;
using OmniSharp.Extensions.DebugAdapter.Protocol.Events;
using OmniSharp.Extensions.DebugAdapter.Protocol.Models;
namespace OmniSharp.Extensions.DebugAdapter.Client
{
public class ClientProgressManager : IProgressStartHandler, IProgressUpdateHandler, IProgressEndHandler, IClientProgressManager, IDisposable
{
private readonly IObserver<IProgressObservable> _observer;
private readonly CompositeDisposable _disposable = new CompositeDisposable();
private readonly ConcurrentDictionary<ProgressToken, ProgressObservable> _activeObservables = new ConcurrentDictionary<ProgressToken, ProgressObservable>(EqualityComparer<ProgressToken>.Default);
public ClientProgressManager()
{
var subject = new Subject<IProgressObservable>();
_disposable.Add(subject);
Progress = subject.AsObservable();
_observer = subject;
}
public IObservable<IProgressObservable> Progress { get; }
Task<Unit> IRequestHandler<ProgressStartEvent, Unit>.Handle(ProgressStartEvent request, CancellationToken cancellationToken)
{
var observable = new ProgressObservable(request.ProgressId);
_activeObservables.TryAdd(request.ProgressId, observable);
observable.OnNext(request);
_observer.OnNext(observable);
return Unit.Task;
}
Task<Unit> IRequestHandler<ProgressUpdateEvent, Unit>.Handle(ProgressUpdateEvent request, CancellationToken cancellationToken)
{
if (_activeObservables.TryGetValue(request.ProgressId, out var observable))
{
observable.OnNext(request);
}
// TODO: Add log message for unhandled?
return Unit.Task;
}
Task<Unit> IRequestHandler<ProgressEndEvent, Unit>.Handle(ProgressEndEvent request, CancellationToken cancellationToken)
{
if (_activeObservables.TryGetValue(request.ProgressId, out var observable))
{
observable.OnNext(request);
}
// TODO: Add log message for unhandled?
return Unit.Task;
}
public void Dispose() => _disposable?.Dispose();
}
}