forked from OmniSharp/csharp-language-server-protocol
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathProcessScheduler.cs
130 lines (121 loc) · 5.06 KB
/
ProcessScheduler.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
namespace OmniSharp.Extensions.JsonRpc
{
public class ProcessScheduler : IScheduler
{
private readonly ILogger<ProcessScheduler> _logger;
private readonly BlockingCollection<(RequestProcessType type, string name, Func<Task> request)> _queue;
private readonly CancellationTokenSource _cancel;
private readonly Thread _thread;
public ProcessScheduler(ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<ProcessScheduler>();
_queue = new BlockingCollection<(RequestProcessType type, string name, Func<Task> request)>();
_cancel = new CancellationTokenSource();
_thread = new Thread(ProcessRequestQueue) { IsBackground = true, Name = "ProcessRequestQueue" };
}
public void Start()
{
_thread.Start();
}
public void Add(RequestProcessType type, string name, Func<Task> request)
{
_queue.Add((type, name, request));
}
private Task Start(Func<Task> request)
{
var t = request();
if (t.Status == TaskStatus.Created) // || t.Status = TaskStatus.WaitingForActivation ?
t.Start();
return t;
}
private List<Task> RemoveCompleteTasks(List<Task> list)
{
if (list.Count == 0) return list;
var result = new List<Task>();
foreach (var t in list)
{
if (t.IsFaulted)
{
// TODO: Handle Fault
}
else if (!t.IsCompleted)
{
result.Add(t);
}
}
return result;
}
public long _TestOnly_NonCompleteTaskCount = 0;
private void ProcessRequestQueue()
{
// see https://github.com/OmniSharp/csharp-language-server-protocol/issues/4
// no need to be async, because this thing already allocated a thread on it's own.
var token = _cancel.Token;
var waitables = new List<Task>();
try
{
while (!token.IsCancellationRequested)
{
if (_queue.TryTake(out var item, Timeout.Infinite, token))
{
var (type, name, request) = item;
try
{
if (type == RequestProcessType.Serial)
{
Task.WaitAll(waitables.ToArray(), token);
Start(request).Wait(token);
}
else if (type == RequestProcessType.Parallel)
{
waitables.Add(Start(request));
}
else
throw new NotImplementedException("Only Serial and Parallel execution types can be handled currently");
waitables = RemoveCompleteTasks(waitables);
Interlocked.Exchange(ref _TestOnly_NonCompleteTaskCount, waitables.Count);
}
catch (OperationCanceledException ex) when (ex.CancellationToken == token)
{
throw;
}
catch (Exception e)
{
// TODO: Should we rethrow or swallow?
// If an exception happens... the whole system could be in a bad state, hence this throwing currently.
_logger.LogCritical(Events.UnhandledException, e, "Unhandled exception executing {Name}", name);
throw;
}
}
}
}
catch (OperationCanceledException ex) when (ex.CancellationToken == token)
{
// OperationCanceledException - The CancellationToken has been canceled.
Task.WaitAll(waitables.ToArray(), TimeSpan.FromMilliseconds(1000));
var keeponrunning = RemoveCompleteTasks(waitables);
Interlocked.Exchange(ref _TestOnly_NonCompleteTaskCount, keeponrunning.Count);
keeponrunning.ForEach((t) =>
{
// TODO: There is no way to abort a Task. As we don't construct the tasks, we can do nothing here
// Option is: change the task factory "Func<Task> request" to a "Func<CancellationToken, Task> request"
});
}
}
private bool _disposed = false;
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_cancel.Cancel();
_thread.Join();
_cancel.Dispose();
}
}
}