forked from OmniSharp/csharp-language-server-protocol
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathOutputHandler.cs
97 lines (88 loc) · 3.38 KB
/
OutputHandler.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace OmniSharp.Extensions.JsonRpc
{
public class OutputHandler : IOutputHandler
{
private readonly Stream _output;
private readonly ISerializer _serializer;
private readonly Thread _thread;
private readonly BlockingCollection<object> _queue;
private readonly CancellationTokenSource _cancel;
private readonly TaskCompletionSource<object> _outputIsFinished;
public OutputHandler(Stream output, ISerializer serializer)
{
if (!output.CanWrite) throw new ArgumentException($"must provide a writable stream for {nameof(output)}", nameof(output));
_output = output;
_serializer = serializer;
_queue = new BlockingCollection<object>();
_cancel = new CancellationTokenSource();
_outputIsFinished = new TaskCompletionSource<object>();
_thread = new Thread(ProcessOutputQueue) { IsBackground = true, Name = "ProcessOutputQueue" };
}
public void Start()
{
_thread.Start();
}
public void Send(object value)
{
_queue.Add(value);
}
private void ProcessOutputQueue()
{
var token = _cancel.Token;
try
{
while (!token.IsCancellationRequested)
{
if (_queue.TryTake(out var value, Timeout.Infinite, token))
{
var content = _serializer.SerializeObject(value);
var contentBytes = System.Text.Encoding.UTF8.GetBytes(content);
// TODO: Is this lsp specific??
var sb = new StringBuilder();
sb.Append($"Content-Length: {contentBytes.Length}\r\n");
sb.Append($"\r\n");
var headerBytes = System.Text.Encoding.UTF8.GetBytes(sb.ToString());
// only one write to _output
using (var ms = new MemoryStream(headerBytes.Length + contentBytes.Length))
{
ms.Write(headerBytes, 0, headerBytes.Length);
ms.Write(contentBytes, 0, contentBytes.Length);
if(!token.IsCancellationRequested)
{
_output.Write(ms.ToArray(), 0, (int)ms.Position);
}
}
}
}
}
catch (OperationCanceledException ex)
{
if (ex.CancellationToken != token)
_outputIsFinished.TrySetException(ex);
// else ignore. Exceptions: OperationCanceledException - The CancellationToken has been canceled.
}
catch (Exception e)
{
_outputIsFinished.TrySetException(e);
}
}
public Task WaitForShutdown()
{
return _outputIsFinished.Task;
}
public void Dispose()
{
_outputIsFinished.TrySetResult(null);
_cancel.Cancel();
_thread.Join();
_cancel.Dispose();
_output.Dispose();
}
}
}