diff --git a/src/JsonRpc/Connection.cs b/src/JsonRpc/Connection.cs index f4b00bf96..546328a9e 100644 --- a/src/JsonRpc/Connection.cs +++ b/src/JsonRpc/Connection.cs @@ -1,5 +1,6 @@ using System; using System.IO.Pipelines; +using System.Reactive.Concurrency; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -22,6 +23,7 @@ public Connection( TimeSpan requestTimeout, bool supportContentModified, int concurrency, + IScheduler scheduler, CreateResponseExceptionHandler? getException = null ) => _inputHandler = new InputHandler( @@ -36,7 +38,8 @@ public Connection( getException, requestTimeout, supportContentModified, - concurrency > 1 ? (int?) concurrency : null + concurrency > 1 ? (int?) concurrency : null, + scheduler ); public void Open() diff --git a/src/JsonRpc/InputHandler.cs b/src/JsonRpc/InputHandler.cs index 3c63ea9e7..4ef3b8fb5 100644 --- a/src/JsonRpc/InputHandler.cs +++ b/src/JsonRpc/InputHandler.cs @@ -70,7 +70,8 @@ public InputHandler( CreateResponseExceptionHandler? getException, TimeSpan requestTimeout, bool supportContentModified, - int? concurrency + int? concurrency, + IScheduler scheduler ) { _pipeReader = pipeReader; @@ -87,7 +88,7 @@ public InputHandler( loggerFactory, supportContentModified, concurrency, - TaskPoolScheduler.Default + scheduler ); _headersBuffer = new Memory(new byte[HeadersFinishedLength]); _contentLengthBuffer = new Memory(new byte[ContentLengthLength]); diff --git a/src/JsonRpc/JsonRpcServerOptionsBase.cs b/src/JsonRpc/JsonRpcServerOptionsBase.cs index e1509b38b..f2759069a 100644 --- a/src/JsonRpc/JsonRpcServerOptionsBase.cs +++ b/src/JsonRpc/JsonRpcServerOptionsBase.cs @@ -3,6 +3,7 @@ using System.IO; using System.IO.Pipelines; using System.Linq; +using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reflection; using Microsoft.Extensions.DependencyInjection; @@ -31,6 +32,8 @@ public ILoggerFactory LoggerFactory public IEnumerable Assemblies { get; set; } = Enumerable.Empty(); public IRequestProcessIdentifier? RequestProcessIdentifier { get; set; } public int? Concurrency { get; set; } + public IScheduler InputScheduler { get; set; } = TaskPoolScheduler.Default; + public IScheduler OutputScheduler { get; set; } = TaskPoolScheduler.Default; public CreateResponseExceptionHandler? CreateResponseException { get; set; } public OnUnhandledExceptionHandler? OnUnhandledException { get; set; } public bool SupportsContentModified { get; set; } = true; diff --git a/src/JsonRpc/JsonRpcServerOptionsExtensions.cs b/src/JsonRpc/JsonRpcServerOptionsExtensions.cs index 742d1c736..6a1e7326c 100644 --- a/src/JsonRpc/JsonRpcServerOptionsExtensions.cs +++ b/src/JsonRpc/JsonRpcServerOptionsExtensions.cs @@ -1,3 +1,5 @@ +using System.Reactive.Concurrency; + namespace OmniSharp.Extensions.JsonRpc { public static class JsonRpcServerOptionsExtensions @@ -7,5 +9,41 @@ public static JsonRpcServerOptions WithSerializer(this JsonRpcServerOptions opti options.Serializer = serializer; return options; } + + /// + /// Sets both input and output schedulers to the same scheduler + /// + /// + /// + /// + public static JsonRpcServerOptions WithScheduler(this JsonRpcServerOptions options, IScheduler inputScheduler) + { + options.InputScheduler = options.OutputScheduler = inputScheduler; + return options; + } + + /// + /// Sets the scheduler used during reading input + /// + /// + /// + /// + public static JsonRpcServerOptions WithInputScheduler(this JsonRpcServerOptions options, IScheduler inputScheduler) + { + options.InputScheduler = inputScheduler; + return options; + } + + /// + /// Sets the scheduler use during writing output + /// + /// + /// + /// + public static JsonRpcServerOptions WithOutputScheduler(this JsonRpcServerOptions options, IScheduler outputScheduler) + { + options.OutputScheduler = outputScheduler; + return options; + } } } diff --git a/src/JsonRpc/JsonRpcServerServiceCollectionExtensions.cs b/src/JsonRpc/JsonRpcServerServiceCollectionExtensions.cs index b6b993b73..e4f3ccfb1 100644 --- a/src/JsonRpc/JsonRpcServerServiceCollectionExtensions.cs +++ b/src/JsonRpc/JsonRpcServerServiceCollectionExtensions.cs @@ -1,6 +1,7 @@ using System; using System.IO.Pipelines; using System.Linq; +using System.Reactive.Concurrency; using System.Threading; using System.Threading.Tasks; using DryIoc; @@ -32,6 +33,8 @@ internal static IContainer AddJsonRpcServerCore(this IContainer container, Js container.RegisterInstance(options.MaximumRequestTimeout, serviceKey: nameof(options.MaximumRequestTimeout)); container.RegisterInstance(options.SupportsContentModified, serviceKey: nameof(options.SupportsContentModified)); container.RegisterInstance(options.Concurrency ?? -1, serviceKey: nameof(options.Concurrency)); + container.RegisterInstance(options.InputScheduler, serviceKey: nameof(options.InputScheduler)); + container.RegisterInstance(options.OutputScheduler, serviceKey: nameof(options.OutputScheduler)); if (options.CreateResponseException != null) { container.RegisterInstance(options.CreateResponseException); @@ -40,7 +43,8 @@ internal static IContainer AddJsonRpcServerCore(this IContainer container, Js container.RegisterMany( nonPublicServiceTypes: true, made: Parameters.Of - .Type(serviceKey: nameof(options.Output)), + .Type(serviceKey: nameof(options.Output)) + .Type(serviceKey: nameof(options.OutputScheduler)), reuse: Reuse.Singleton ); container.Register( @@ -48,7 +52,9 @@ internal static IContainer AddJsonRpcServerCore(this IContainer container, Js .Type(serviceKey: nameof(options.Input)) .Type(serviceKey: nameof(options.MaximumRequestTimeout)) .Type(serviceKey: nameof(options.SupportsContentModified)) - .Name("concurrency", serviceKey: nameof(options.Concurrency)), + .Name("concurrency", serviceKey: nameof(options.Concurrency)) + .Type(serviceKey: nameof(options.InputScheduler)) + , reuse: Reuse.Singleton ); diff --git a/src/JsonRpc/OutputHandler.cs b/src/JsonRpc/OutputHandler.cs index d887ea8b8..e2320428d 100644 --- a/src/JsonRpc/OutputHandler.cs +++ b/src/JsonRpc/OutputHandler.cs @@ -63,21 +63,6 @@ private bool ShouldSend(object value) return _outputFilters.Any(z => z.ShouldOutput(value)); } - public OutputHandler( - PipeWriter pipeWriter, - ISerializer serializer, - IEnumerable outputFilters, - ILogger logger - ) : this( - pipeWriter, - serializer, - outputFilters, - TaskPoolScheduler.Default, - logger - ) - { - } - public void Send(object? value) { if (_queue.IsDisposed || _disposable.IsDisposed || value == null) return; diff --git a/src/Shared/LanguageProtocolServiceCollectionExtensions.cs b/src/Shared/LanguageProtocolServiceCollectionExtensions.cs index dee4aa9aa..4a0b3cd7c 100644 --- a/src/Shared/LanguageProtocolServiceCollectionExtensions.cs +++ b/src/Shared/LanguageProtocolServiceCollectionExtensions.cs @@ -1,5 +1,6 @@ using System; using System.Linq; +using System.Reactive.Concurrency; using DryIoc; using Microsoft.Extensions.DependencyInjection; using OmniSharp.Extensions.JsonRpc; diff --git a/test/JsonRpc.Tests/InputHandlerTests.cs b/test/JsonRpc.Tests/InputHandlerTests.cs index e44867d1a..4eb72aac5 100644 --- a/test/JsonRpc.Tests/InputHandlerTests.cs +++ b/test/JsonRpc.Tests/InputHandlerTests.cs @@ -2,6 +2,7 @@ using System.IO; using System.IO.Pipelines; using System.Linq; +using System.Reactive.Concurrency; using System.Reflection; using System.Text; using System.Threading; @@ -35,7 +36,8 @@ private InputHandler NewHandler( IRequestProcessIdentifier requestProcessIdentifier, IRequestRouter requestRouter, ILoggerFactory loggerFactory, - IResponseRouter responseRouter + IResponseRouter responseRouter, + IScheduler? scheduler = null ) => new InputHandler( inputStream, @@ -49,7 +51,8 @@ IResponseRouter responseRouter null, TimeSpan.FromSeconds(30), true, - null + null, + scheduler ?? TaskPoolScheduler.Default ); [Fact]