Skip to content

Added ability to define the scheduler used for both input and output #458

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/JsonRpc/Connection.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.IO.Pipelines;
using System.Reactive.Concurrency;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

Expand All @@ -22,6 +23,7 @@ public Connection(
TimeSpan requestTimeout,
bool supportContentModified,
int concurrency,
IScheduler scheduler,
CreateResponseExceptionHandler? getException = null
) =>
_inputHandler = new InputHandler(
Expand All @@ -36,7 +38,8 @@ public Connection(
getException,
requestTimeout,
supportContentModified,
concurrency > 1 ? (int?) concurrency : null
concurrency > 1 ? (int?) concurrency : null,
scheduler
);

public void Open()
Expand Down
5 changes: 3 additions & 2 deletions src/JsonRpc/InputHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public InputHandler(
CreateResponseExceptionHandler? getException,
TimeSpan requestTimeout,
bool supportContentModified,
int? concurrency
int? concurrency,
IScheduler scheduler
)
{
_pipeReader = pipeReader;
Expand All @@ -87,7 +88,7 @@ public InputHandler(
loggerFactory,
supportContentModified,
concurrency,
TaskPoolScheduler.Default
scheduler
);
_headersBuffer = new Memory<byte>(new byte[HeadersFinishedLength]);
_contentLengthBuffer = new Memory<byte>(new byte[ContentLengthLength]);
Expand Down
3 changes: 3 additions & 0 deletions src/JsonRpc/JsonRpcServerOptionsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.IO;
using System.IO.Pipelines;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reflection;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -31,6 +32,8 @@ public ILoggerFactory LoggerFactory
public IEnumerable<Assembly> Assemblies { get; set; } = Enumerable.Empty<Assembly>();
public IRequestProcessIdentifier? RequestProcessIdentifier { get; set; }
public int? Concurrency { get; set; }
public IScheduler InputScheduler { get; set; } = TaskPoolScheduler.Default;
public IScheduler OutputScheduler { get; set; } = TaskPoolScheduler.Default;
public CreateResponseExceptionHandler? CreateResponseException { get; set; }
public OnUnhandledExceptionHandler? OnUnhandledException { get; set; }
public bool SupportsContentModified { get; set; } = true;
Expand Down
38 changes: 38 additions & 0 deletions src/JsonRpc/JsonRpcServerOptionsExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System.Reactive.Concurrency;

namespace OmniSharp.Extensions.JsonRpc
{
public static class JsonRpcServerOptionsExtensions
Expand All @@ -7,5 +9,41 @@ public static JsonRpcServerOptions WithSerializer(this JsonRpcServerOptions opti
options.Serializer = serializer;
return options;
}

/// <summary>
/// Sets both input and output schedulers to the same scheduler
/// </summary>
/// <param name="options"></param>
/// <param name="inputScheduler"></param>
/// <returns></returns>
public static JsonRpcServerOptions WithScheduler(this JsonRpcServerOptions options, IScheduler inputScheduler)
{
options.InputScheduler = options.OutputScheduler = inputScheduler;
return options;
}

/// <summary>
/// Sets the scheduler used during reading input
/// </summary>
/// <param name="options"></param>
/// <param name="inputScheduler"></param>
/// <returns></returns>
public static JsonRpcServerOptions WithInputScheduler(this JsonRpcServerOptions options, IScheduler inputScheduler)
{
options.InputScheduler = inputScheduler;
return options;
}

/// <summary>
/// Sets the scheduler use during writing output
/// </summary>
/// <param name="options"></param>
/// <param name="outputScheduler"></param>
/// <returns></returns>
public static JsonRpcServerOptions WithOutputScheduler(this JsonRpcServerOptions options, IScheduler outputScheduler)
{
options.OutputScheduler = outputScheduler;
return options;
}
}
}
10 changes: 8 additions & 2 deletions src/JsonRpc/JsonRpcServerServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.IO.Pipelines;
using System.Linq;
using System.Reactive.Concurrency;
using System.Threading;
using System.Threading.Tasks;
using DryIoc;
Expand Down Expand Up @@ -32,6 +33,8 @@ internal static IContainer AddJsonRpcServerCore<T>(this IContainer container, Js
container.RegisterInstance(options.MaximumRequestTimeout, serviceKey: nameof(options.MaximumRequestTimeout));
container.RegisterInstance(options.SupportsContentModified, serviceKey: nameof(options.SupportsContentModified));
container.RegisterInstance(options.Concurrency ?? -1, serviceKey: nameof(options.Concurrency));
container.RegisterInstance(options.InputScheduler, serviceKey: nameof(options.InputScheduler));
container.RegisterInstance(options.OutputScheduler, serviceKey: nameof(options.OutputScheduler));
if (options.CreateResponseException != null)
{
container.RegisterInstance(options.CreateResponseException);
Expand All @@ -40,15 +43,18 @@ internal static IContainer AddJsonRpcServerCore<T>(this IContainer container, Js
container.RegisterMany<OutputHandler>(
nonPublicServiceTypes: true,
made: Parameters.Of
.Type<PipeWriter>(serviceKey: nameof(options.Output)),
.Type<PipeWriter>(serviceKey: nameof(options.Output))
.Type<IScheduler>(serviceKey: nameof(options.OutputScheduler)),
reuse: Reuse.Singleton
);
container.Register<Connection>(
made: new Made.TypedMade<Connection>().Parameters
.Type<PipeReader>(serviceKey: nameof(options.Input))
.Type<TimeSpan>(serviceKey: nameof(options.MaximumRequestTimeout))
.Type<bool>(serviceKey: nameof(options.SupportsContentModified))
.Name("concurrency", serviceKey: nameof(options.Concurrency)),
.Name("concurrency", serviceKey: nameof(options.Concurrency))
.Type<IScheduler>(serviceKey: nameof(options.InputScheduler))
,
reuse: Reuse.Singleton
);

Expand Down
15 changes: 0 additions & 15 deletions src/JsonRpc/OutputHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,6 @@ private bool ShouldSend(object value)
return _outputFilters.Any(z => z.ShouldOutput(value));
}

public OutputHandler(
PipeWriter pipeWriter,
ISerializer serializer,
IEnumerable<IOutputFilter> outputFilters,
ILogger<OutputHandler> logger
) : this(
pipeWriter,
serializer,
outputFilters,
TaskPoolScheduler.Default,
logger
)
{
}

public void Send(object? value)
{
if (_queue.IsDisposed || _disposable.IsDisposed || value == null) return;
Expand Down
1 change: 1 addition & 0 deletions src/Shared/LanguageProtocolServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Linq;
using System.Reactive.Concurrency;
using DryIoc;
using Microsoft.Extensions.DependencyInjection;
using OmniSharp.Extensions.JsonRpc;
Expand Down
7 changes: 5 additions & 2 deletions test/JsonRpc.Tests/InputHandlerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.IO;
using System.IO.Pipelines;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reflection;
using System.Text;
using System.Threading;
Expand Down Expand Up @@ -35,7 +36,8 @@ private InputHandler NewHandler(
IRequestProcessIdentifier requestProcessIdentifier,
IRequestRouter<IHandlerDescriptor?> requestRouter,
ILoggerFactory loggerFactory,
IResponseRouter responseRouter
IResponseRouter responseRouter,
IScheduler? scheduler = null
) =>
new InputHandler(
inputStream,
Expand All @@ -49,7 +51,8 @@ IResponseRouter responseRouter
null,
TimeSpan.FromSeconds(30),
true,
null
null,
scheduler ?? TaskPoolScheduler.Default
);

[Fact]
Expand Down