diff --git a/src/JsonRpc/Connection.cs b/src/JsonRpc/Connection.cs index 546328a9e..27018bd86 100644 --- a/src/JsonRpc/Connection.cs +++ b/src/JsonRpc/Connection.cs @@ -11,6 +11,7 @@ public class Connection : IDisposable private readonly InputHandler _inputHandler; public bool IsOpen { get; private set; } + [Obsolete("Use the other constructor that takes a request invoker")] public Connection( PipeReader input, IOutputHandler outputHandler, @@ -25,21 +26,49 @@ public Connection( int concurrency, IScheduler scheduler, CreateResponseExceptionHandler? getException = null + ) : this( + input, + outputHandler, + receiver, + requestRouter, + responseRouter, + new DefaultRequestInvoker( + requestRouter, + outputHandler, + requestProcessIdentifier, + new RequestInvokerOptions( + requestTimeout, + supportContentModified, + concurrency), + loggerFactory, + scheduler), + loggerFactory, + onUnhandledException, + getException) + { + } + + public Connection( + PipeReader input, + IOutputHandler outputHandler, + IReceiver receiver, + IRequestRouter requestRouter, + IResponseRouter responseRouter, + RequestInvoker requestInvoker, + ILoggerFactory loggerFactory, + OnUnhandledExceptionHandler onUnhandledException, + CreateResponseExceptionHandler? getException = null ) => _inputHandler = new InputHandler( input, outputHandler, receiver, - requestProcessIdentifier, requestRouter, responseRouter, + requestInvoker, loggerFactory, onUnhandledException, - getException, - requestTimeout, - supportContentModified, - concurrency > 1 ? (int?) concurrency : null, - scheduler + getException ); public void Open() diff --git a/src/JsonRpc/ContentModified.cs b/src/JsonRpc/ContentModified.cs index 9407faca7..b5b8c821a 100644 --- a/src/JsonRpc/ContentModified.cs +++ b/src/JsonRpc/ContentModified.cs @@ -1,15 +1,15 @@ -using OmniSharp.Extensions.JsonRpc.Server; +using OmniSharp.Extensions.JsonRpc.Server; using OmniSharp.Extensions.JsonRpc.Server.Messages; namespace OmniSharp.Extensions.JsonRpc { public class ContentModified : RpcError { - internal ContentModified(string method) : base(null, method, new ErrorMessage(ErrorCodes.ContentModified, "Content Modified")) + public ContentModified(string method) : base(null, method, new ErrorMessage(ErrorCodes.ContentModified, "Content Modified")) { } - internal ContentModified(object id, string method) : base(id, method, new ErrorMessage(ErrorCodes.ContentModified, "Content Modified")) + public ContentModified(object id, string method) : base(id, method, new ErrorMessage(ErrorCodes.ContentModified, "Content Modified")) { } } diff --git a/src/JsonRpc/DefaultRequestInvoker.cs b/src/JsonRpc/DefaultRequestInvoker.cs new file mode 100644 index 000000000..f4657dc97 --- /dev/null +++ b/src/JsonRpc/DefaultRequestInvoker.cs @@ -0,0 +1,181 @@ +using System; +using System.Reactive; +using System.Reactive.Concurrency; +using System.Reactive.Disposables; +using System.Reactive.Linq; +using Microsoft.Extensions.Logging; +using OmniSharp.Extensions.JsonRpc.Server; +using OmniSharp.Extensions.JsonRpc.Server.Messages; +using Notification = OmniSharp.Extensions.JsonRpc.Server.Notification; + +namespace OmniSharp.Extensions.JsonRpc +{ + public class DefaultRequestInvoker : RequestInvoker + { + private readonly IRequestRouter _requestRouter; + private readonly IOutputHandler _outputHandler; + private readonly ProcessScheduler _processScheduler; + private readonly IRequestProcessIdentifier _requestProcessIdentifier; + private readonly RequestInvokerOptions _options; + private readonly ILogger _logger; + + public DefaultRequestInvoker( + IRequestRouter requestRouter, + IOutputHandler outputHandler, + IRequestProcessIdentifier requestProcessIdentifier, + RequestInvokerOptions options, + ILoggerFactory loggerFactory, + IScheduler scheduler) + { + _requestRouter = requestRouter; + _outputHandler = outputHandler; + _requestProcessIdentifier = requestProcessIdentifier; + _options = options; + _processScheduler = new ProcessScheduler(loggerFactory, _options.SupportContentModified, _options.Concurrency, scheduler); + _logger = loggerFactory.CreateLogger(); + } + + public override RequestInvocationHandle InvokeRequest(IRequestDescriptor descriptor, Request request) + { + if (descriptor.Default is null) + { + throw new ArgumentNullException(nameof(descriptor.Default)); + } + + var handle = new RequestInvocationHandle(request); + var type = _requestProcessIdentifier.Identify(descriptor.Default); + + var schedulerDelegate = RouteRequest(descriptor, request, handle); + _processScheduler.Add(type, $"{request.Method}:{request.Id}", schedulerDelegate); + + return handle; + } + + public override void InvokeNotification(IRequestDescriptor descriptor, Notification notification) + { + if (descriptor.Default is null) + { + throw new ArgumentNullException(nameof(descriptor.Default)); + } + + var type = _requestProcessIdentifier.Identify(descriptor.Default); + var schedulerDelegate = RouteNotification(descriptor, notification); + _processScheduler.Add(type, notification.Method, schedulerDelegate); + } + + public override void Dispose() + { + _processScheduler.Dispose(); + } + + private SchedulerDelegate RouteRequest( + IRequestDescriptor descriptor, + Request request, + RequestInvocationHandle handle) + { + var cts = handle.CancellationTokenSource; + return (contentModifiedToken, scheduler) => + Observable.Create( + observer => { + // ITS A RACE! + var sub = Observable.Amb( + contentModifiedToken.Select( + _ => { + _logger.LogTrace( + "Request {Id} was abandoned due to content be modified", request.Id + ); + return new ErrorResponse( + new ContentModified(request.Id, request.Method) + ); + } + ), + Observable.Timer(_options.RequestTimeout, scheduler).Select( + _ => new ErrorResponse(new RequestCancelled(request.Id, request.Method)) + ), + Observable.FromAsync( + async ct => { + using var timer = _logger.TimeDebug( + "Processing request {Method} {ResponseId}", request.Method, + request.Id + ); + ct.Register(cts.Cancel); + // ObservableToToken(contentModifiedToken).Register(cts.Cancel); + try + { + var result = await _requestRouter.RouteRequest( + descriptor, request, cts.Token + ).ConfigureAwait(false); + return result; + } + catch (OperationCanceledException) + { + _logger.LogTrace("Request {Id} was cancelled", request.Id); + return new RequestCancelled(request.Id, request.Method); + } + catch (RpcErrorException e) + { + _logger.LogCritical( + Events.UnhandledRequest, e, + "Failed to handle request {Method} {RequestId}", request.Method, + request.Id + ); + return new RpcError( + request.Id, request.Method, + new ErrorMessage(e.Code, e.Message, e.Error) + ); + } + catch (Exception e) + { + _logger.LogCritical( + Events.UnhandledRequest, e, + "Failed to handle request {Method} {RequestId}", request.Method, + request.Id + ); + return new InternalError(request.Id, request.Method, e.ToString()); + } + } + ) + ) + .Subscribe(observer); + return new CompositeDisposable(sub, handle); + } + ) + .Select( + response => { + _outputHandler.Send(response.Value); + return Unit.Default; + } + ); + } + + private SchedulerDelegate RouteNotification( + IRequestDescriptor descriptors, + Notification notification) => + (_, scheduler) => + // ITS A RACE! + Observable.Amb( + Observable.Timer(_options.RequestTimeout, scheduler) + .Select(_ => Unit.Default) + .Do( + _ => _logger.LogTrace("Notification was cancelled due to timeout") + ), + Observable.FromAsync( + async ct => { + using var timer = _logger.TimeDebug("Processing notification {Method}", notification.Method); + try + { + await _requestRouter.RouteNotification(descriptors, notification, ct).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + _logger.LogTrace("Notification was cancelled"); + } + catch (Exception e) + { + _logger.LogCritical(Events.UnhandledRequest, e, "Failed to handle request {Method}", notification.Method); + } + } + ) + ); + } +} diff --git a/src/JsonRpc/InputHandler.cs b/src/JsonRpc/InputHandler.cs index 6d267c101..ab7a23058 100644 --- a/src/JsonRpc/InputHandler.cs +++ b/src/JsonRpc/InputHandler.cs @@ -19,32 +19,29 @@ using Newtonsoft.Json.Linq; using OmniSharp.Extensions.JsonRpc.Server; using OmniSharp.Extensions.JsonRpc.Server.Messages; -using Notification = OmniSharp.Extensions.JsonRpc.Server.Notification; namespace OmniSharp.Extensions.JsonRpc { public class InputHandler : IInputHandler, IDisposable { public static readonly byte[] HeadersFinished = - new[] { (byte) '\r', (byte) '\n', (byte) '\r', (byte) '\n' }.ToArray(); + new[] { (byte)'\r', (byte)'\n', (byte)'\r', (byte)'\n' }.ToArray(); public const int HeadersFinishedLength = 4; public static readonly char[] HeaderKeys = { '\r', '\n', ':' }; public const short MinBuffer = 21; // Minimum size of the buffer "Content-Length: X\r\n\r\n" - public static readonly byte[] ContentLength = "Content-Length".Select(x => (byte) x).ToArray(); + public static readonly byte[] ContentLength = "Content-Length".Select(x => (byte)x).ToArray(); public static readonly int ContentLengthLength = 14; private readonly PipeReader _pipeReader; private readonly IOutputHandler _outputHandler; private readonly IReceiver _receiver; - private readonly IRequestProcessIdentifier _requestProcessIdentifier; private readonly IRequestRouter _requestRouter; private readonly IResponseRouter _responseRouter; private readonly OnUnhandledExceptionHandler _unhandledInputProcessException; private readonly CreateResponseExceptionHandler? _getException; - private readonly TimeSpan _requestTimeout; private readonly ILogger _logger; - private readonly ProcessScheduler _scheduler; + private readonly RequestInvoker _requestInvoker; private readonly Memory _headersBuffer; private readonly Memory _contentLengthBuffer; private readonly byte[] _contentLengthValueBuffer; @@ -53,11 +50,12 @@ public class InputHandler : IInputHandler, IDisposable private readonly CompositeDisposable _disposable; private readonly AsyncSubject _inputActive; - private readonly ConcurrentDictionary descriptor)> _requests = - new ConcurrentDictionary descriptor)>(); + private readonly ConcurrentDictionary _requests = + new ConcurrentDictionary(); private readonly Subject> _inputQueue; + [Obsolete("Use the other constructor that takes a request invoker")] public InputHandler( PipeReader pipeReader, IOutputHandler outputHandler, @@ -72,24 +70,49 @@ public InputHandler( bool supportContentModified, int? concurrency, IScheduler scheduler + ) : this( + pipeReader, + outputHandler, + receiver, + requestRouter, + responseRouter, + new DefaultRequestInvoker( + requestRouter, + outputHandler, + requestProcessIdentifier, + new RequestInvokerOptions( + requestTimeout, + supportContentModified, + concurrency ?? 0), + loggerFactory, + scheduler), + loggerFactory, + unhandledInputProcessException, + getException) + { + } + + public InputHandler( + PipeReader pipeReader, + IOutputHandler outputHandler, + IReceiver receiver, + IRequestRouter requestRouter, + IResponseRouter responseRouter, + RequestInvoker requestInvoker, + ILoggerFactory loggerFactory, + OnUnhandledExceptionHandler unhandledInputProcessException, + CreateResponseExceptionHandler? getException ) { _pipeReader = pipeReader; _outputHandler = outputHandler; _receiver = receiver; - _requestProcessIdentifier = requestProcessIdentifier; _requestRouter = requestRouter; _responseRouter = responseRouter; + _requestInvoker = requestInvoker; _unhandledInputProcessException = unhandledInputProcessException; _getException = getException; - _requestTimeout = requestTimeout; _logger = loggerFactory.CreateLogger(); - _scheduler = new ProcessScheduler( - loggerFactory, - supportContentModified, - concurrency, - scheduler - ); _headersBuffer = new Memory(new byte[HeadersFinishedLength]); _contentLengthBuffer = new Memory(new byte[ContentLengthLength]); _contentLengthValueBuffer = new byte[20]; // Max string length of the long value @@ -100,7 +123,7 @@ IScheduler scheduler _disposable = new CompositeDisposable { Disposable.Create(() => _stopProcessing.Cancel()), _stopProcessing, - _scheduler, + _requestInvoker, }; _inputActive = new AsyncSubject(); @@ -146,7 +169,7 @@ private bool TryParseHeaders(ref ReadOnlySequence buffer, out ReadOnlySequ var rentedSpan = _headersBuffer.Span; - var start = buffer.PositionOf((byte) '\r'); + var start = buffer.PositionOf((byte)'\r'); do { if (!start.HasValue) @@ -171,7 +194,7 @@ private bool TryParseHeaders(ref ReadOnlySequence buffer, out ReadOnlySequ return true; } - start = buffer.Slice(buffer.GetPosition(HeadersFinishedLength, start.Value)).PositionOf((byte) '\r'); + start = buffer.Slice(buffer.GetPosition(HeadersFinishedLength, start.Value)).PositionOf((byte)'\r'); } while (start.HasValue && buffer.Length > MinBuffer); line = default; @@ -213,7 +236,7 @@ private bool TryParseContentLength(ref ReadOnlySequence buffer, out long l { do { - var colon = buffer.PositionOf((byte) ':'); + var colon = buffer.PositionOf((byte)':'); if (!colon.HasValue) { length = -1; @@ -232,7 +255,7 @@ private bool TryParseContentLength(ref ReadOnlySequence buffer, out long l { foreach (var t in memory.Span) { - if (t == (byte) ' ') + if (t == (byte)' ') { offset++; continue; @@ -244,10 +267,10 @@ private bool TryParseContentLength(ref ReadOnlySequence buffer, out long l var lengthSlice = buffer.Slice( buffer.GetPosition(offset, colon.Value), - buffer.PositionOf((byte) '\r') ?? buffer.End + buffer.PositionOf((byte)'\r') ?? buffer.End ); - var whitespacePosition = lengthSlice.PositionOf((byte) ' '); + var whitespacePosition = lengthSlice.PositionOf((byte)' '); if (whitespacePosition.HasValue) { lengthSlice = lengthSlice.Slice(0, whitespacePosition!.Value); @@ -268,7 +291,7 @@ private bool TryParseContentLength(ref ReadOnlySequence buffer, out long l return false; } - buffer = buffer.Slice(buffer.GetPosition(1, buffer.PositionOf((byte) '\n') ?? buffer.End)); + buffer = buffer.Slice(buffer.GetPosition(1, buffer.PositionOf((byte)'\n') ?? buffer.End)); } while (true); } @@ -422,8 +445,10 @@ private void HandleRequest(in ReadOnlySequence request) return; } - var type = _requestProcessIdentifier.Identify(descriptor.Default); - _scheduler.Add(type, $"{item.Request.Method}:{item.Request.Id}", RouteRequest(descriptor, item.Request)); + var requestHandle = _requestInvoker.InvokeRequest(descriptor, item.Request); + + _requests.TryAdd(requestHandle.Request.Id, requestHandle); + requestHandle.OnComplete += (request) => _requests.TryRemove(request.Id, out _); } catch (JsonReaderException e) { @@ -453,9 +478,9 @@ private void HandleRequest(in ReadOnlySequence request) } _logger.LogDebug("Cancelling pending request", item.Notification.Method); - if (_requests.TryGetValue(cancelParams.Id, out var d)) + if (_requests.TryGetValue(cancelParams.Id, out var requestHandle)) { - d.cancellationTokenSource.Cancel(); + requestHandle.CancellationTokenSource.Cancel(); } continue; @@ -471,9 +496,7 @@ private void HandleRequest(in ReadOnlySequence request) return; } - var type = _requestProcessIdentifier.Identify(descriptor.Default); - _scheduler.Add(type, item.Notification.Method, RouteNotification(descriptor, item.Notification)); - + _requestInvoker.InvokeNotification(descriptor, item.Notification); } catch (JsonReaderException e) { @@ -492,123 +515,6 @@ private void HandleRequest(in ReadOnlySequence request) } } - private SchedulerDelegate RouteRequest(IRequestDescriptor descriptors, Request request) - { - // start request, create cts, etc - var cts = new CancellationTokenSource(); - _requests.TryAdd(request.Id, ( cts, descriptors )); - - return (contentModifiedToken, scheduler) => - Observable.Create( - observer => { - // ITS A RACE! - var sub = Observable.Amb( - contentModifiedToken.Select( - _ => { - _logger.LogTrace( - "Request {Id} was abandoned due to content be modified", request.Id - ); - return new ErrorResponse( - new ContentModified(request.Id, request.Method) - ); - } - ), - Observable.Timer(_requestTimeout, scheduler).Select( - _ => new ErrorResponse(new RequestCancelled(request.Id, request.Method)) - ), - Observable.FromAsync( - async ct => { - using var timer = _logger.TimeDebug( - "Processing request {Method} {ResponseId}", request.Method, - request.Id - ); - ct.Register(cts.Cancel); - // ObservableToToken(contentModifiedToken).Register(cts.Cancel); - try - { - return await _requestRouter.RouteRequest( - descriptors, request, cts.Token - ).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - _logger.LogTrace("Request {Id} was cancelled", request.Id); - return new RequestCancelled(request.Id, request.Method); - } - catch (RpcErrorException e) - { - _logger.LogCritical( - Events.UnhandledRequest, e, - "Failed to handle request {Method} {RequestId}", request.Method, - request.Id - ); - return new RpcError( - request.Id, request.Method, - new ErrorMessage(e.Code, e.Message, e.Error) - ); - } - catch (Exception e) - { - _logger.LogCritical( - Events.UnhandledRequest, e, - "Failed to handle request {Method} {RequestId}", request.Method, - request.Id - ); - return new InternalError(request.Id, request.Method, e.ToString()); - } - } - ) - ) - .Subscribe(observer); - return new CompositeDisposable { - sub, - Disposable.Create( - () => { - if (_requests.TryRemove(request.Id, out var v)) - { - v.cancellationTokenSource.Dispose(); - } - } - ) - }; - } - ) - .Select( - response => { - _outputHandler.Send(response.Value); - return Unit.Default; - } - ); - } - - private SchedulerDelegate RouteNotification(IRequestDescriptor descriptors, Notification notification) => - (_, scheduler) => - // ITS A RACE! - Observable.Amb( - Observable.Timer(_requestTimeout, scheduler) - .Select(_ => Unit.Default) - .Do( - _ => _logger.LogTrace("Notification was cancelled due to timeout") - ), - Observable.FromAsync( - async ct => { - using var timer = _logger.TimeDebug("Processing notification {Method}", notification.Method); - try - { - await _requestRouter.RouteNotification(descriptors, notification, ct).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - _logger.LogTrace("Notification was cancelled"); - } - catch (Exception e) - { - _logger.LogCritical(Events.UnhandledRequest, e, "Failed to handle request {Method}", notification.Method); - } - } - ) - ); - private static Exception DefaultErrorParser(string? method, ServerError error, CreateResponseExceptionHandler? customHandler) => error.Error.Code switch { ErrorCodes.ServerNotInitialized => new ServerNotInitializedException(error.Id), diff --git a/src/JsonRpc/JsonRpcServerServiceCollectionExtensions.cs b/src/JsonRpc/JsonRpcServerServiceCollectionExtensions.cs index 1859fd078..e61f8059c 100644 --- a/src/JsonRpc/JsonRpcServerServiceCollectionExtensions.cs +++ b/src/JsonRpc/JsonRpcServerServiceCollectionExtensions.cs @@ -47,13 +47,25 @@ internal static IContainer AddJsonRpcServerCore(this IContainer container, Js .Type(serviceKey: nameof(options.OutputScheduler)), reuse: Reuse.Singleton ); + + container.Register( + made: new Made.TypedMade().Parameters + .Type(serviceKey: nameof(options.MaximumRequestTimeout)) + .Type(serviceKey: nameof(options.SupportsContentModified)) + .Name("concurrency", serviceKey: nameof(options.Concurrency)), + reuse: Reuse.Singleton); + + if (!container.IsRegistered()) + { + container.Register( + made: new Made.TypedMade().Parameters + .Type(serviceKey: nameof(options.InputScheduler)), + reuse: Reuse.Singleton); + } + container.Register( made: new Made.TypedMade().Parameters .Type(serviceKey: nameof(options.Input)) - .Type(serviceKey: nameof(options.MaximumRequestTimeout)) - .Type(serviceKey: nameof(options.SupportsContentModified)) - .Name("concurrency", serviceKey: nameof(options.Concurrency)) - .Type(serviceKey: nameof(options.InputScheduler)) , reuse: Reuse.Singleton ); diff --git a/src/JsonRpc/RequestCancelled.cs b/src/JsonRpc/RequestCancelled.cs index 6bda757ef..a01c991f0 100644 --- a/src/JsonRpc/RequestCancelled.cs +++ b/src/JsonRpc/RequestCancelled.cs @@ -1,15 +1,15 @@ -using OmniSharp.Extensions.JsonRpc.Server; +using OmniSharp.Extensions.JsonRpc.Server; using OmniSharp.Extensions.JsonRpc.Server.Messages; namespace OmniSharp.Extensions.JsonRpc { public class RequestCancelled : RpcError { - internal RequestCancelled(string method) : base(null, method, new ErrorMessage(ErrorCodes.RequestCancelled, "Request Cancelled")) + public RequestCancelled(string method) : base(null, method, new ErrorMessage(ErrorCodes.RequestCancelled, "Request Cancelled")) { } - internal RequestCancelled(object id, string method) : base(id, method, new ErrorMessage(ErrorCodes.RequestCancelled, "Request Cancelled")) + public RequestCancelled(object id, string method) : base(id, method, new ErrorMessage(ErrorCodes.RequestCancelled, "Request Cancelled")) { } } diff --git a/src/JsonRpc/RequestInvocationHandle.cs b/src/JsonRpc/RequestInvocationHandle.cs new file mode 100644 index 000000000..9377a8886 --- /dev/null +++ b/src/JsonRpc/RequestInvocationHandle.cs @@ -0,0 +1,35 @@ +using System; +using System.Threading; +using OmniSharp.Extensions.JsonRpc.Server; + +namespace OmniSharp.Extensions.JsonRpc +{ + public class RequestInvocationHandle : IDisposable + { + private bool _disposed; + + public event Action? OnComplete; + + public RequestInvocationHandle(Request request) + { + Request = request; + CancellationTokenSource = new CancellationTokenSource(); + } + + public Request Request { get; } + + public CancellationTokenSource CancellationTokenSource { get; } + + public void Dispose() + { + if (_disposed) + { + return; + } + + _disposed = true; + + OnComplete?.Invoke(Request); + } + } +} diff --git a/src/JsonRpc/RequestInvoker.cs b/src/JsonRpc/RequestInvoker.cs new file mode 100644 index 000000000..51b721100 --- /dev/null +++ b/src/JsonRpc/RequestInvoker.cs @@ -0,0 +1,15 @@ +using System; +using OmniSharp.Extensions.JsonRpc.Server; +using Notification = OmniSharp.Extensions.JsonRpc.Server.Notification; + +namespace OmniSharp.Extensions.JsonRpc +{ + public abstract class RequestInvoker : IDisposable + { + public abstract RequestInvocationHandle InvokeRequest(IRequestDescriptor descriptor, Request request); + + public abstract void InvokeNotification(IRequestDescriptor descriptor, Notification notification); + + public abstract void Dispose(); + } +} diff --git a/src/JsonRpc/RequestInvokerOptions.cs b/src/JsonRpc/RequestInvokerOptions.cs new file mode 100644 index 000000000..47e88c0ec --- /dev/null +++ b/src/JsonRpc/RequestInvokerOptions.cs @@ -0,0 +1,23 @@ +using System; + +namespace OmniSharp.Extensions.JsonRpc +{ + public sealed class RequestInvokerOptions + { + public RequestInvokerOptions( + TimeSpan requestTimeout, + bool supportContentModified, + int concurrency) + { + RequestTimeout = requestTimeout; + SupportContentModified = supportContentModified; + Concurrency = concurrency > 1 ? concurrency : null; + } + + public TimeSpan RequestTimeout { get; } + + public bool SupportContentModified { get; } + + public int? Concurrency { get; } + } +} diff --git a/src/JsonRpc/TimeLoggerExtensions.cs b/src/JsonRpc/TimeLoggerExtensions.cs index 3a8066ac6..7a84b9b0d 100644 --- a/src/JsonRpc/TimeLoggerExtensions.cs +++ b/src/JsonRpc/TimeLoggerExtensions.cs @@ -5,7 +5,7 @@ namespace OmniSharp.Extensions.JsonRpc { - internal static class TimeLoggerExtensions + public static class TimeLoggerExtensions { private class Disposable : IDisposable { diff --git a/test/JsonRpc.Tests/InputHandlerTests.cs b/test/JsonRpc.Tests/InputHandlerTests.cs index 4eb72aac5..6efb24bdf 100644 --- a/test/JsonRpc.Tests/InputHandlerTests.cs +++ b/test/JsonRpc.Tests/InputHandlerTests.cs @@ -33,26 +33,21 @@ private InputHandler NewHandler( PipeReader inputStream, IOutputHandler outputHandler, IReceiver receiver, - IRequestProcessIdentifier requestProcessIdentifier, IRequestRouter requestRouter, ILoggerFactory loggerFactory, IResponseRouter responseRouter, - IScheduler? scheduler = null + RequestInvoker requestInvoker ) => new InputHandler( inputStream, outputHandler, receiver, - requestProcessIdentifier, requestRouter, responseRouter, + requestInvoker, loggerFactory, _unhandledException, - null, - TimeSpan.FromSeconds(30), - true, - null, - scheduler ?? TaskPoolScheduler.Default + null ); [Fact] @@ -65,9 +60,9 @@ public async Task Should_Pass_In_Requests() using var handler = NewHandler( pipe.Reader, outputHandler, receiver, - Substitute.For(), Substitute.For>(), - _loggerFactory, Substitute.For() + _loggerFactory, Substitute.For(), + Substitute.For() ); await pipe.Writer.WriteAsync(Encoding.UTF8.GetBytes("Content-Length: 2\r\n\r\n{}")); @@ -91,9 +86,9 @@ public async Task Should_Handle_Multiple_Requests_At_Once() using var handler = NewHandler( pipe.Reader, outputHandler, receiver, - Substitute.For(), Substitute.For>(), - _loggerFactory, Substitute.For() + _loggerFactory, Substitute.For(), + Substitute.For() ); await pipe.Writer.WriteAsync( Encoding.UTF8.GetBytes("Content-Length: 2\r\n\r\n{}") @@ -133,9 +128,9 @@ public async Task Should_Handle_Different_Additional_Headers_and_Whitespace(stri using var handler = NewHandler( pipe.Reader, outputHandler, receiver, - Substitute.For(), Substitute.For>(), - _loggerFactory, Substitute.For() + _loggerFactory, Substitute.For(), + Substitute.For() ); await pipe.Writer.WriteAsync(Encoding.UTF8.GetBytes(data)); @@ -161,9 +156,9 @@ public async Task Should_Handle_Multiple_Requests_Back_To_Back() using var handler = NewHandler( pipe.Reader, outputHandler, receiver, - Substitute.For(), Substitute.For>(), - _loggerFactory, Substitute.For() + _loggerFactory, Substitute.For(), + Substitute.For() ); var cts = new CancellationTokenSource(); @@ -194,9 +189,9 @@ public async Task Should_Handle_Multiple_Requests_In_Pieces() using var handler = NewHandler( pipe.Reader, outputHandler, receiver, - Substitute.For(), Substitute.For>(), - _loggerFactory, Substitute.For() + _loggerFactory, Substitute.For(), + Substitute.For() ); var cts = new CancellationTokenSource(); @@ -237,9 +232,9 @@ public async Task Should_Handle_Multiple_Chunked_Requests(string content) using var handler = NewHandler( pipe.Reader, outputHandler, receiver, - Substitute.For(), Substitute.For>(), - _loggerFactory, Substitute.For() + _loggerFactory, Substitute.For(), + Substitute.For() ); var cts = new CancellationTokenSource(); @@ -270,9 +265,9 @@ public async Task Should_Handle_Header_Terminiator_Being_Incomplete() using var handler = NewHandler( pipe.Reader, outputHandler, receiver, - Substitute.For(), Substitute.For>(), - _loggerFactory, Substitute.For() + _loggerFactory, Substitute.For(), + Substitute.For() ); var cts = new CancellationTokenSource(); @@ -323,9 +318,9 @@ public async Task ShouldPassAdditionalUtf8EncodedRequests(string data) using var handler = NewHandler( pipe.Reader, outputHandler, receiver, - Substitute.For(), Substitute.For>(), - _loggerFactory, Substitute.For() + _loggerFactory, Substitute.For(), + Substitute.For() ); var cts = new CancellationTokenSource(); @@ -363,13 +358,14 @@ public async Task Should_Parse_Logs(string name, Func createPipeRead var incomingRequestRouter = Substitute.For>(); var outputHandler = Substitute.For(); var responseRouter = Substitute.For(); + var requestInvoker = Substitute.For(); using var handler = NewHandler( reader, outputHandler, receiver, - new ParallelRequestProcessIdentifier(), incomingRequestRouter, _loggerFactory, - responseRouter + responseRouter, + requestInvoker ); var cts = new CancellationTokenSource(); @@ -382,20 +378,17 @@ public async Task Should_Parse_Logs(string name, Func createPipeRead { { var count = group.Count(x => x == "request"); - await incomingRequestRouter.Received(count).RouteRequest( + requestInvoker.Received(count).InvokeRequest( Arg.Any>(), - Arg.Is(n => group.Key == n.Method), - Arg.Any() + Arg.Is(n => group.Key == n.Method) ); } - { var count = group.Count(x => x == "notification"); - await incomingRequestRouter.Received(count).RouteNotification( + requestInvoker.Received(count).InvokeNotification( Arg.Any>(), - Arg.Is(n => group.Key == n.Method), - Arg.Any() + Arg.Is(n => group.Key == n.Method) ); } } diff --git a/test/Lsp.Integration.Tests/TypedCompletionTests.cs b/test/Lsp.Integration.Tests/TypedCompletionTests.cs index e39147815..ac238921a 100644 --- a/test/Lsp.Integration.Tests/TypedCompletionTests.cs +++ b/test/Lsp.Integration.Tests/TypedCompletionTests.cs @@ -228,7 +228,8 @@ public async Task Should_Resolve_With_Partial_Data_Capability() } ); - var item = await client.RequestCompletion(new CompletionParams()).SelectMany(z => z).Take(1).ToTask(CancellationToken); + var completionList = await client.RequestCompletion(new CompletionParams()); + var item = completionList.First(); item = await client.ResolveCompletion(item); item.Detail.Should().Be("resolved");