diff --git a/src/JsonRpc/InputHandler.cs b/src/JsonRpc/InputHandler.cs index 3b9d68544..9e3137bdd 100644 --- a/src/JsonRpc/InputHandler.cs +++ b/src/JsonRpc/InputHandler.cs @@ -72,52 +72,59 @@ private void ProcessInputStream() // content is encoded in UTF-8 while (true) { - if (_inputThread == null) return; - - var buffer = new byte[300]; - var current = _input.Read(buffer, 0, MinBuffer); - if (current == 0) return; // no more _input - while (current < MinBuffer || - buffer[current - 4] != CR || buffer[current - 3] != LF || - buffer[current - 2] != CR || buffer[current - 1] != LF) - { - var n = _input.Read(buffer, current, 1); - if (n == 0) return; // no more _input, mitigates endless loop here. - current += n; - } + try { + if (_inputThread == null) return; + + var buffer = new byte[300]; + var current = _input.Read(buffer, 0, MinBuffer); + if (current == 0) return; // no more _input + while (current < MinBuffer || + buffer[current - 4] != CR || buffer[current - 3] != LF || + buffer[current - 2] != CR || buffer[current - 1] != LF) + { + var n = _input.Read(buffer, current, 1); + if (n == 0) return; // no more _input, mitigates endless loop here. + current += n; + } - var headersContent = System.Text.Encoding.ASCII.GetString(buffer, 0, current); - var headers = headersContent.Split(HeaderKeys, StringSplitOptions.RemoveEmptyEntries); - long length = 0; - for (var i = 1; i < headers.Length; i += 2) - { - // starting at i = 1 instead of 0 won't throw, if we have uneven headers' length - var header = headers[i - 1]; - var value = headers[i].Trim(); - if (header.Equals("Content-Length", StringComparison.OrdinalIgnoreCase)) + var headersContent = System.Text.Encoding.ASCII.GetString(buffer, 0, current); + var headers = headersContent.Split(HeaderKeys, StringSplitOptions.RemoveEmptyEntries); + long length = 0; + for (var i = 1; i < headers.Length; i += 2) { - length = 0; - long.TryParse(value, out length); + // starting at i = 1 instead of 0 won't throw, if we have uneven headers' length + var header = headers[i - 1]; + var value = headers[i].Trim(); + if (header.Equals("Content-Length", StringComparison.OrdinalIgnoreCase)) + { + length = 0; + long.TryParse(value, out length); + } } - } - if (length == 0 || length >= int.MaxValue) - { - HandleRequest(string.Empty); - } - else - { - var requestBuffer = new byte[length]; - var received = 0; - while (received < length) + if (length == 0 || length >= int.MaxValue) + { + HandleRequest(string.Empty); + } + else { - var n = _input.Read(requestBuffer, received, requestBuffer.Length - received); - if (n == 0) return; // no more _input - received += n; + var requestBuffer = new byte[length]; + var received = 0; + while (received < length) + { + var n = _input.Read(requestBuffer, received, requestBuffer.Length - received); + if (n == 0) return; // no more _input + received += n; + } + // TODO sometimes: encoding should be based on the respective header (including the wrong "utf8" value) + var payload = System.Text.Encoding.UTF8.GetString(requestBuffer); + HandleRequest(payload); } - // TODO sometimes: encoding should be based on the respective header (including the wrong "utf8" value) - var payload = System.Text.Encoding.UTF8.GetString(requestBuffer); - HandleRequest(payload); + } + catch (IOException) + { + _logger.LogError("Input stream has been closed."); + break; } } } @@ -229,9 +236,10 @@ private void HandleRequest(string request) public void Dispose() { + _scheduler.Dispose(); _outputHandler.Dispose(); _inputThread = null; - _scheduler.Dispose(); + _input?.Dispose(); } } } diff --git a/src/JsonRpc/OutputHandler.cs b/src/JsonRpc/OutputHandler.cs index 4b27c999f..9575cce55 100644 --- a/src/JsonRpc/OutputHandler.cs +++ b/src/JsonRpc/OutputHandler.cs @@ -23,6 +23,7 @@ public OutputHandler(Stream output, ISerializer serializer) _serializer = serializer; _queue = new BlockingCollection(); _cancel = new CancellationTokenSource(); + _outputIsFinished = new TaskCompletionSource(); _thread = new Thread(ProcessOutputQueue) { IsBackground = true, Name = "ProcessOutputQueue" }; } @@ -41,7 +42,7 @@ private void ProcessOutputQueue() var token = _cancel.Token; try { - while (true) + while (!token.IsCancellationRequested) { if (_queue.TryTake(out var value, Timeout.Infinite, token)) { @@ -59,7 +60,10 @@ private void ProcessOutputQueue() { ms.Write(headerBytes, 0, headerBytes.Length); ms.Write(contentBytes, 0, contentBytes.Length); - _output.Write(ms.ToArray(), 0, (int)ms.Position); + if(!token.IsCancellationRequested) + { + _output.Write(ms.ToArray(), 0, (int)ms.Position); + } } } } @@ -67,12 +71,12 @@ private void ProcessOutputQueue() catch (OperationCanceledException ex) { if (ex.CancellationToken != token) - _outputIsFinished.SetException(ex); + _outputIsFinished.TrySetException(ex); // else ignore. Exceptions: OperationCanceledException - The CancellationToken has been canceled. } catch (Exception e) { - _outputIsFinished.SetException(e); + _outputIsFinished.TrySetException(e); } } diff --git a/src/JsonRpc/ProcessScheduler.cs b/src/JsonRpc/ProcessScheduler.cs index c71a81822..7ad98c52d 100644 --- a/src/JsonRpc/ProcessScheduler.cs +++ b/src/JsonRpc/ProcessScheduler.cs @@ -68,7 +68,7 @@ private void ProcessRequestQueue() var waitables = new List(); try { - while (true) + while (!token.IsCancellationRequested) { if (_queue.TryTake(out var item, Timeout.Infinite, token)) {