diff --git a/Directory.Build.targets b/Directory.Build.targets
index 46eeba140..42ef6cfdb 100644
--- a/Directory.Build.targets
+++ b/Directory.Build.targets
@@ -1,52 +1,53 @@
-
-
-
- $(BaseIntermediateOutputPath)\GeneratedFiles
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
+
+
+
+ $(BaseIntermediateOutputPath)\GeneratedFiles
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Client/LanguageClient.cs b/src/Client/LanguageClient.cs
index e2304fb25..0d5ba8b19 100644
--- a/src/Client/LanguageClient.cs
+++ b/src/Client/LanguageClient.cs
@@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
+using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
@@ -48,6 +49,7 @@ public class LanguageClient : JsonRpcServerBase, ILanguageClient
private readonly IEnumerable _initializedHandlers;
private readonly LspSerializer _serializer;
private readonly InstanceHasStarted _instanceHasStarted;
+ private readonly IScheduler _scheduler;
private readonly IResponseRouter _responseRouter;
private readonly ISubject _initializeComplete = new AsyncSubject();
private readonly CompositeDisposable _disposable = new CompositeDisposable();
@@ -150,7 +152,8 @@ internal LanguageClient(
IEnumerable initializedDelegates,
IEnumerable initializedHandlers,
LspSerializer serializer,
- InstanceHasStarted instanceHasStarted
+ InstanceHasStarted instanceHasStarted,
+ IScheduler scheduler
) : base(handlerCollection, responseRouter)
{
_connection = connection;
@@ -179,6 +182,7 @@ InstanceHasStarted instanceHasStarted
_initializedHandlers = initializedHandlers;
_serializer = serializer;
_instanceHasStarted = instanceHasStarted;
+ _scheduler = scheduler;
_concurrency = options.Value.Concurrency;
// We need to at least create Window here in case any handler does loggin in their constructor
@@ -262,6 +266,7 @@ await LanguageProtocolEventingHelper.Run(
_initializeHandlers.Union(_collection.Select(z => z.Handler).OfType()),
(handler, ct) => handler.OnInitialize(this, @params, ct),
_concurrency,
+ _scheduler,
token
).ConfigureAwait(false);
@@ -281,6 +286,7 @@ await LanguageProtocolEventingHelper.Run(
_initializedHandlers.Union(_collection.Select(z => z.Handler).OfType()),
(handler, ct) => handler.OnInitialized(this, @params, serverParams, ct),
_concurrency,
+ _scheduler,
token
).ConfigureAwait(false);
@@ -299,6 +305,7 @@ await LanguageProtocolEventingHelper.Run(
_startedHandlers.Union(_collection.Select(z => z.Handler).OfType()),
(handler, ct) => handler.OnStarted(this, ct),
_concurrency,
+ _scheduler,
token
).ConfigureAwait(false);
@@ -395,7 +402,7 @@ private Supports UseOrTryAndFindCapability(Supports supports) where T :
bool IResponseRouter.TryGetRequest(long id, [NotNullWhen(true)] out string method, [NotNullWhen(true)] out TaskCompletionSource pendingTask) =>
_responseRouter.TryGetRequest(id, out method, out pendingTask);
- public Task WasStarted => _initializeComplete.ToTask();
+ public Task WasStarted => _initializeComplete.ToTask(_scheduler);
public void Dispose()
{
diff --git a/src/Client/LanguageClientOptionsExtensions.cs b/src/Client/LanguageClientOptionsExtensions.cs
index 2c10ae955..32fe472e1 100644
--- a/src/Client/LanguageClientOptionsExtensions.cs
+++ b/src/Client/LanguageClientOptionsExtensions.cs
@@ -1,4 +1,5 @@
using System;
+using System.Reactive.Concurrency;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
@@ -87,6 +88,54 @@ public static LanguageClientOptions WithClientCapabilities(this LanguageClientOp
return options;
}
+ ///
+ /// Sets both input and output schedulers to the same scheduler
+ ///
+ ///
+ ///
+ ///
+ public static LanguageClientOptions WithScheduler(this LanguageClientOptions options, IScheduler inputScheduler)
+ {
+ options.InputScheduler = options.OutputScheduler = options.DefaultScheduler = inputScheduler;
+ return options;
+ }
+
+ ///
+ /// Sets the scheduler used during reading input
+ ///
+ ///
+ ///
+ ///
+ public static LanguageClientOptions WithInputScheduler(this LanguageClientOptions options, IScheduler inputScheduler)
+ {
+ options.InputScheduler = inputScheduler;
+ return options;
+ }
+
+ ///
+ /// Sets the default scheduler to be used when scheduling other tasks
+ ///
+ ///
+ ///
+ ///
+ public static LanguageClientOptions WithDefaultScheduler(this LanguageClientOptions options, IScheduler defaultScheduler)
+ {
+ options.DefaultScheduler = defaultScheduler;
+ return options;
+ }
+
+ ///
+ /// Sets the scheduler use during writing output
+ ///
+ ///
+ ///
+ ///
+ public static LanguageClientOptions WithOutputScheduler(this LanguageClientOptions options, IScheduler outputScheduler)
+ {
+ options.OutputScheduler = outputScheduler;
+ return options;
+ }
+
public static LanguageClientOptions OnInitialize(this LanguageClientOptions options, OnLanguageClientInitializeDelegate @delegate)
{
options.Services.AddSingleton(@delegate);
diff --git a/src/Client/LanguageClientRegistrationManager.cs b/src/Client/LanguageClientRegistrationManager.cs
index 828a7b596..b6703e4c3 100644
--- a/src/Client/LanguageClientRegistrationManager.cs
+++ b/src/Client/LanguageClientRegistrationManager.cs
@@ -2,6 +2,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
+using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
@@ -26,7 +27,7 @@ internal class LanguageClientRegistrationManager : IRegisterCapabilityHandler, I
private readonly ILspHandlerTypeDescriptorProvider _handlerTypeDescriptorProvider;
private readonly ILogger _logger;
private readonly ConcurrentDictionary _registrations;
- private readonly ReplaySubject> _registrationSubject = new ReplaySubject>(1);
+ private readonly ReplaySubject> _registrationSubject = new ReplaySubject>(1, Scheduler.Immediate);
public LanguageClientRegistrationManager(
ISerializer serializer,
diff --git a/src/Client/LanguageClientWorkspaceFoldersManager.cs b/src/Client/LanguageClientWorkspaceFoldersManager.cs
index c768335bc..1397dad26 100644
--- a/src/Client/LanguageClientWorkspaceFoldersManager.cs
+++ b/src/Client/LanguageClientWorkspaceFoldersManager.cs
@@ -2,6 +2,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
+using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
@@ -25,7 +26,7 @@ public LanguageClientWorkspaceFoldersManager(IWorkspaceLanguageClient client, IE
{
_client = client;
_workspaceFolders = new ConcurrentDictionary(DocumentUri.Comparer);
- _workspaceFoldersSubject = new ReplaySubject>(1);
+ _workspaceFoldersSubject = new ReplaySubject>(1, Scheduler.Immediate);
foreach (var folder in workspaceFolders)
{
diff --git a/src/Dap.Client/DebugAdapterClient.cs b/src/Dap.Client/DebugAdapterClient.cs
index 5d8854b95..f2ec06604 100644
--- a/src/Dap.Client/DebugAdapterClient.cs
+++ b/src/Dap.Client/DebugAdapterClient.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
@@ -30,6 +31,7 @@ public class DebugAdapterClient : JsonRpcServerBase, IDebugAdapterClient, IDebug
private readonly IEnumerable _startedDelegates;
private readonly IEnumerable _startedHandlers;
private readonly InstanceHasStarted _instanceHasStarted;
+ private readonly IScheduler _scheduler;
private readonly CompositeDisposable _disposable = new CompositeDisposable();
private readonly Connection _connection;
private readonly DapReceiver _receiver;
@@ -97,7 +99,8 @@ internal DebugAdapterClient(
IEnumerable initializedDelegates,
IEnumerable initializedHandlers,
IEnumerable startedHandlers,
- InstanceHasStarted instanceHasStarted
+ InstanceHasStarted instanceHasStarted,
+ IScheduler scheduler
) : base(collection, responseRouter)
{
_settingsBag = settingsBag;
@@ -114,6 +117,7 @@ InstanceHasStarted instanceHasStarted
_initializedHandlers = initializedHandlers;
_startedHandlers = startedHandlers;
_instanceHasStarted = instanceHasStarted;
+ _scheduler = scheduler;
_concurrency = options.Value.Concurrency;
_disposable.Add(collection.Add(this));
@@ -127,6 +131,7 @@ await DebugAdapterEventingHelper.Run(
_initializeHandlers.Union(_collection.Select(z => z.Handler).OfType()),
(handler, ct) => handler.OnInitialize(this, ClientSettings, ct),
_concurrency,
+ _scheduler,
token
).ConfigureAwait(false);
@@ -145,10 +150,11 @@ await DebugAdapterEventingHelper.Run(
_initializedHandlers.Union(_collection.Select(z => z.Handler).OfType()),
(handler, ct) => handler.OnInitialized(this, ClientSettings, ServerSettings, ct),
_concurrency,
+ _scheduler,
token
).ConfigureAwait(false);
- await _initializedComplete.ToTask(token);
+ await _initializedComplete.ToTask(token, _scheduler);
await DebugAdapterEventingHelper.Run(
_startedDelegates,
@@ -156,6 +162,7 @@ await DebugAdapterEventingHelper.Run(
_startedHandlers.Union(_collection.Select(z => z.Handler).OfType()),
(handler, ct) => handler.OnStarted(this, ct),
_concurrency,
+ _scheduler,
token
).ConfigureAwait(false);
diff --git a/src/Dap.Client/DebugAdapterClientOptionsExtensions.cs b/src/Dap.Client/DebugAdapterClientOptionsExtensions.cs
index 4b3f8236c..6c10f6646 100644
--- a/src/Dap.Client/DebugAdapterClientOptionsExtensions.cs
+++ b/src/Dap.Client/DebugAdapterClientOptionsExtensions.cs
@@ -1,4 +1,5 @@
using System;
+using System.Reactive.Concurrency;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
@@ -21,6 +22,54 @@ public static DebugAdapterClientOptions WithRequestProcessIdentifier(this DebugA
return options;
}
+ ///
+ /// Sets both input and output schedulers to the same scheduler
+ ///
+ ///
+ ///
+ ///
+ public static DebugAdapterClientOptions WithScheduler(this DebugAdapterClientOptions options, IScheduler inputScheduler)
+ {
+ options.InputScheduler = options.OutputScheduler = options.DefaultScheduler = inputScheduler;
+ return options;
+ }
+
+ ///
+ /// Sets the scheduler used during reading input
+ ///
+ ///
+ ///
+ ///
+ public static DebugAdapterClientOptions WithInputScheduler(this DebugAdapterClientOptions options, IScheduler inputScheduler)
+ {
+ options.InputScheduler = inputScheduler;
+ return options;
+ }
+
+ ///
+ /// Sets the default scheduler to be used when scheduling other tasks
+ ///
+ ///
+ ///
+ ///
+ public static DebugAdapterClientOptions WithDefaultScheduler(this DebugAdapterClientOptions options, IScheduler defaultScheduler)
+ {
+ options.DefaultScheduler = defaultScheduler;
+ return options;
+ }
+
+ ///
+ /// Sets the scheduler use during writing output
+ ///
+ ///
+ ///
+ ///
+ public static DebugAdapterClientOptions WithOutputScheduler(this DebugAdapterClientOptions options, IScheduler outputScheduler)
+ {
+ options.OutputScheduler = outputScheduler;
+ return options;
+ }
+
public static DebugAdapterClientOptions OnInitialize(this DebugAdapterClientOptions options, OnDebugAdapterClientInitializeDelegate @delegate)
{
options.Services.AddSingleton(@delegate);
diff --git a/src/Dap.Client/ProgressObservable.cs b/src/Dap.Client/ProgressObservable.cs
index c961689b3..8860ba5c9 100644
--- a/src/Dap.Client/ProgressObservable.cs
+++ b/src/Dap.Client/ProgressObservable.cs
@@ -1,4 +1,5 @@
using System;
+using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
using OmniSharp.Extensions.DebugAdapter.Protocol;
@@ -14,7 +15,7 @@ internal class ProgressObservable : IProgressObservable, IObserver(1);
+ _dataSubject = new ReplaySubject(1, Scheduler.Immediate);
_disposable = new CompositeDisposable { Disposable.Create(_dataSubject.OnCompleted) };
ProgressToken = token;
diff --git a/src/Dap.Protocol/Feature/Events/ProgressFeature.cs b/src/Dap.Protocol/Feature/Events/ProgressFeature.cs
index f0df93eac..16ebd60a5 100644
--- a/src/Dap.Protocol/Feature/Events/ProgressFeature.cs
+++ b/src/Dap.Protocol/Feature/Events/ProgressFeature.cs
@@ -23,7 +23,7 @@ public abstract record ProgressEvent
public string? Message { get; init; }
}
- [Parallel]
+ [Serial]
[Method(EventNames.ProgressStart, Direction.ServerToClient)]
[
GenerateHandler,
@@ -60,7 +60,7 @@ public record ProgressStartEvent : ProgressEvent, IRequest
public int? Percentage { get; init; }
}
- [Parallel]
+ [Serial]
[Method(EventNames.ProgressUpdate, Direction.ServerToClient)]
[
GenerateHandler,
@@ -76,7 +76,7 @@ public record ProgressUpdateEvent : ProgressEvent, IRequest
public double? Percentage { get; init; }
}
- [Parallel]
+ [Serial]
[Method(EventNames.ProgressEnd, Direction.ServerToClient)]
[
GenerateHandler,
diff --git a/src/Dap.Server/DebugAdapterServer.cs b/src/Dap.Server/DebugAdapterServer.cs
index 880b6e7db..7e6cde49a 100644
--- a/src/Dap.Server/DebugAdapterServer.cs
+++ b/src/Dap.Server/DebugAdapterServer.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
@@ -30,6 +31,7 @@ public class DebugAdapterServer : JsonRpcServerBase, IDebugAdapterServer, IDebug
private readonly IEnumerable _startedDelegates;
private readonly IEnumerable _startedHandlers;
private readonly InstanceHasStarted _instanceHasStarted;
+ private readonly IScheduler _scheduler;
private readonly IServiceProvider _serviceProvider;
private readonly CompositeDisposable _disposable = new CompositeDisposable();
private readonly Connection _connection;
@@ -100,7 +102,8 @@ internal DebugAdapterServer(
IEnumerable initializeHandlers,
IEnumerable initializedHandlers,
IEnumerable startedHandlers,
- InstanceHasStarted instanceHasStarted
+ InstanceHasStarted instanceHasStarted,
+ IScheduler scheduler
) : base(collection, responseRouter)
{
_capabilities = capabilities;
@@ -117,6 +120,7 @@ InstanceHasStarted instanceHasStarted
_initializedHandlers = initializedHandlers;
_startedHandlers = startedHandlers;
_instanceHasStarted = instanceHasStarted;
+ _scheduler = scheduler;
_concurrency = options.Value.Concurrency;
_disposable.Add(collection.Add(this));
@@ -142,7 +146,7 @@ public async Task Initialize(CancellationToken token)
_connection.Open();
try
{
- _initializingTask = _initializeComplete.ToTask(token);
+ _initializingTask = _initializeComplete.ToTask(token, _scheduler);
await _initializingTask.ConfigureAwait(false);
await DebugAdapterEventingHelper.Run(
_startedDelegates,
@@ -150,6 +154,7 @@ await DebugAdapterEventingHelper.Run(
_startedHandlers.Union(_collection.Select(z => z.Handler).OfType()),
(handler, ct) => handler.OnStarted(this, ct),
_concurrency,
+ _scheduler,
token
).ConfigureAwait(false);
_instanceHasStarted.Started = true;
@@ -181,6 +186,7 @@ await DebugAdapterEventingHelper.Run(
_initializeHandlers.Union(_collection.Select(z => z.Handler).OfType()),
(handler, ct) => handler.OnInitialize(this, request, ct),
_concurrency,
+ _scheduler,
cancellationToken
).ConfigureAwait(false);
@@ -231,6 +237,7 @@ await DebugAdapterEventingHelper.Run(
_initializedHandlers.Union(_collection.Select(z => z.Handler).OfType()),
(handler, ct) => handler.OnInitialized(this, request, response, ct),
_concurrency,
+ _scheduler,
cancellationToken
).ConfigureAwait(false);
diff --git a/src/Dap.Server/DebugAdapterServerOptionsExtensions.cs b/src/Dap.Server/DebugAdapterServerOptionsExtensions.cs
index 89aa784bb..f25ed04e0 100644
--- a/src/Dap.Server/DebugAdapterServerOptionsExtensions.cs
+++ b/src/Dap.Server/DebugAdapterServerOptionsExtensions.cs
@@ -1,4 +1,5 @@
using System;
+using System.Reactive.Concurrency;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
@@ -21,6 +22,54 @@ public static DebugAdapterServerOptions WithRequestProcessIdentifier(this DebugA
return options;
}
+ ///
+ /// Sets both input and output schedulers to the same scheduler
+ ///
+ ///
+ ///
+ ///
+ public static DebugAdapterServerOptions WithScheduler(this DebugAdapterServerOptions options, IScheduler inputScheduler)
+ {
+ options.InputScheduler = options.OutputScheduler = options.DefaultScheduler = inputScheduler;
+ return options;
+ }
+
+ ///
+ /// Sets the scheduler used during reading input
+ ///
+ ///
+ ///
+ ///
+ public static DebugAdapterServerOptions WithInputScheduler(this DebugAdapterServerOptions options, IScheduler inputScheduler)
+ {
+ options.InputScheduler = inputScheduler;
+ return options;
+ }
+
+ ///
+ /// Sets the default scheduler to be used when scheduling other tasks
+ ///
+ ///
+ ///
+ ///
+ public static DebugAdapterServerOptions WithDefaultScheduler(this DebugAdapterServerOptions options, IScheduler defaultScheduler)
+ {
+ options.DefaultScheduler = defaultScheduler;
+ return options;
+ }
+
+ ///
+ /// Sets the scheduler use during writing output
+ ///
+ ///
+ ///
+ ///
+ public static DebugAdapterServerOptions WithOutputScheduler(this DebugAdapterServerOptions options, IScheduler outputScheduler)
+ {
+ options.OutputScheduler = outputScheduler;
+ return options;
+ }
+
public static DebugAdapterServerOptions OnInitialize(this DebugAdapterServerOptions options, OnDebugAdapterServerInitializeDelegate @delegate)
{
options.Services.AddSingleton(@delegate);
diff --git a/src/Dap.Shared/DebugAdapterEventingHelper.cs b/src/Dap.Shared/DebugAdapterEventingHelper.cs
index 60176311f..4db62e462 100644
--- a/src/Dap.Shared/DebugAdapterEventingHelper.cs
+++ b/src/Dap.Shared/DebugAdapterEventingHelper.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
@@ -16,6 +17,7 @@ public static Task Run(
IEnumerable handlers,
Func executeHandler,
int? concurrency,
+ IScheduler scheduler,
CancellationToken cancellationToken
)
{
@@ -28,13 +30,13 @@ CancellationToken cancellationToken
{
return events.Merge(concurrency.Value)
.LastOrDefaultAsync()
- .ToTask(cancellationToken);
+ .ToTask(cancellationToken, scheduler);
}
return events
.Merge()
.LastOrDefaultAsync()
- .ToTask(cancellationToken);
+ .ToTask(cancellationToken, scheduler);
}
}
}
diff --git a/src/Dap.Testing/DebugAdapterProtocolTestBase.cs b/src/Dap.Testing/DebugAdapterProtocolTestBase.cs
index 92eabb3bf..2cbe87d4e 100644
--- a/src/Dap.Testing/DebugAdapterProtocolTestBase.cs
+++ b/src/Dap.Testing/DebugAdapterProtocolTestBase.cs
@@ -45,10 +45,11 @@ Action serverOptionsAction
options
.WithLoggerFactory(TestOptions.ClientLoggerFactory)
.ConfigureLogging(
- x => {
- x.SetMinimumLevel(LogLevel.Trace);
- }
+ x => { x.SetMinimumLevel(LogLevel.Trace); }
)
+ .WithInputScheduler(options.InputScheduler)
+ .WithOutputScheduler(options.OutputScheduler)
+ .WithDefaultScheduler(options.DefaultScheduler)
.Services
.AddTransient(typeof(IPipelineBehavior<,>), typeof(SettlePipeline<,>))
.AddSingleton(ClientEvents as IRequestSettler);
@@ -61,11 +62,10 @@ Action serverOptionsAction
options => {
options
.WithLoggerFactory(TestOptions.ServerLoggerFactory)
- .ConfigureLogging(
- x => {
- x.SetMinimumLevel(LogLevel.Trace);
- }
- )
+ .ConfigureLogging(x => { x.SetMinimumLevel(LogLevel.Trace); })
+ .WithInputScheduler(options.InputScheduler)
+ .WithOutputScheduler(options.OutputScheduler)
+ .WithDefaultScheduler(options.DefaultScheduler)
.Services
.AddTransient(typeof(IPipelineBehavior<,>), typeof(SettlePipeline<,>))
.AddSingleton(ServerEvents as IRequestSettler);
@@ -77,10 +77,9 @@ Action serverOptionsAction
Disposable.Add(_client);
Disposable.Add(_server);
- return await Observable.FromAsync(_client.Initialize).ForkJoin(
- Observable.FromAsync(_server.Initialize),
- (_, _) => ( _client, _server )
- ).ToTask(CancellationToken).ConfigureAwait(false);
+ return await Observable.FromAsync(_client.Initialize)
+ .ForkJoin(Observable.FromAsync(_server.Initialize), (_, _) => ( _client, _server ))
+ .ToTask(CancellationToken);
}
}
}
diff --git a/src/Dap.Testing/DebugAdapterServerTestBase.cs b/src/Dap.Testing/DebugAdapterServerTestBase.cs
index 7654f2cf1..1c53d4354 100644
--- a/src/Dap.Testing/DebugAdapterServerTestBase.cs
+++ b/src/Dap.Testing/DebugAdapterServerTestBase.cs
@@ -37,6 +37,9 @@ protected virtual async Task InitializeClient(Action), typeof(SettlePipeline<,>))
.AddSingleton(Events as IRequestSettler);
diff --git a/src/JsonRpc.Generators/Helpers.cs b/src/JsonRpc.Generators/Helpers.cs
index ebed8a799..f7559140d 100644
--- a/src/JsonRpc.Generators/Helpers.cs
+++ b/src/JsonRpc.Generators/Helpers.cs
@@ -802,8 +802,40 @@ public static ArrowExpressionClauseSyntax GetRequestInvokeExpression() =>
)
);
- public static ArrowExpressionClauseSyntax GetPartialInvokeExpression(TypeSyntax responseType) =>
- ArrowExpressionClause(
+ public static ArrowExpressionClauseSyntax GetPartialInvokeExpression(TypeSyntax responseType, TypeSyntax? partialItemType)
+ {
+ var realResponseType = responseType is NullableTypeSyntax nts ? nts.ElementType : responseType;
+ var factoryArgument = Argument(
+ SimpleLambdaExpression(
+ Parameter(Identifier("value")),
+ ObjectCreationExpression(realResponseType)
+ .WithArgumentList(ArgumentList(SingletonSeparatedList(Argument(IdentifierName("value")))))
+ )
+ );
+ var arguments = new[] {
+ Argument(
+ IdentifierName(@"request")
+ ),
+ factoryArgument,
+ Argument(IdentifierName("cancellationToken"))
+ };
+ if (partialItemType is {})
+ {
+ var realPartialItemType = partialItemType is NullableTypeSyntax nts2 ? nts2.ElementType : partialItemType;
+ arguments = new[] {
+ arguments[0],
+ arguments[1],
+ Argument(
+ SimpleLambdaExpression(
+ Parameter(Identifier("value")),
+ ObjectCreationExpression(realPartialItemType)
+ .WithArgumentList(ArgumentList(SingletonSeparatedList(Argument(IdentifierName("value")))))
+ )
+ ),
+ arguments[2]
+ };
+ }
+ return ArrowExpressionClause(
InvocationExpression(
MemberAccessExpression(
SyntaxKind.SimpleMemberAccessExpression,
@@ -815,26 +847,9 @@ public static ArrowExpressionClauseSyntax GetPartialInvokeExpression(TypeSyntax
IdentifierName("MonitorUntil")
)
)
- .WithArgumentList(
- ArgumentList(
- SeparatedList(
- new[] {
- Argument(
- IdentifierName(@"request")
- ),
- Argument(
- SimpleLambdaExpression(
- Parameter(Identifier("value")),
- ObjectCreationExpression(responseType is NullableTypeSyntax nts ? nts.ElementType : responseType)
- .WithArgumentList(ArgumentList(SingletonSeparatedList(Argument(IdentifierName("value")))))
- )
- ),
- Argument(IdentifierName("cancellationToken"))
- }
- )
- )
- )
+ .WithArgumentList(ArgumentList(SeparatedList(arguments)))
);
+ }
public static string GetExtensionClassName(INamedTypeSymbol symbol) => SpecialCasedHandlerFullName(symbol).Split('.').Last() + "Extensions";
diff --git a/src/JsonRpc.Generators/Strategies/SendMethodRequestStrategy.cs b/src/JsonRpc.Generators/Strategies/SendMethodRequestStrategy.cs
index 47eb046d8..fdcf99139 100644
--- a/src/JsonRpc.Generators/Strategies/SendMethodRequestStrategy.cs
+++ b/src/JsonRpc.Generators/Strategies/SendMethodRequestStrategy.cs
@@ -59,7 +59,7 @@ public IEnumerable Apply(ExtensionMethodContext extensi
)
)
.WithParameterList(parameterList)
- .WithExpressionBody(Helpers.GetPartialInvokeExpression(request.Response.Syntax))
+ .WithExpressionBody(Helpers.GetPartialInvokeExpression(request.Response.Syntax, request.PartialItem.Syntax))
.WithSemicolonToken(Token(SyntaxKind.SemicolonToken));
yield break;
}
@@ -91,7 +91,7 @@ public IEnumerable Apply(ExtensionMethodContext extensi
)
)
.WithParameterList(parameterList)
- .WithExpressionBody(Helpers.GetPartialInvokeExpression(request.Response.Syntax))
+ .WithExpressionBody(Helpers.GetPartialInvokeExpression(request.Response.Syntax, default))
.WithSemicolonToken(Token(SyntaxKind.SemicolonToken));
yield break;
}
diff --git a/src/JsonRpc.Testing/JsonRpcTestBase.cs b/src/JsonRpc.Testing/JsonRpcTestBase.cs
index 4b0accc07..991ca6165 100644
--- a/src/JsonRpc.Testing/JsonRpcTestBase.cs
+++ b/src/JsonRpc.Testing/JsonRpcTestBase.cs
@@ -1,7 +1,9 @@
using System;
using System.Diagnostics;
using System.Reactive;
+using System.Reactive.Concurrency;
using System.Reactive.Disposables;
+using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
diff --git a/src/JsonRpc.Testing/JsonRpcTestOptions.cs b/src/JsonRpc.Testing/JsonRpcTestOptions.cs
index 13df29487..1b998acab 100644
--- a/src/JsonRpc.Testing/JsonRpcTestOptions.cs
+++ b/src/JsonRpc.Testing/JsonRpcTestOptions.cs
@@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Linq;
+using System.Reactive.Concurrency;
using System.Reflection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
@@ -24,6 +25,8 @@ public JsonRpcTestOptions(ILoggerFactory clientLoggerFactory, ILoggerFactory ser
public ILoggerFactory ClientLoggerFactory { get; internal set; } = NullLoggerFactory.Instance;
public ILoggerFactory ServerLoggerFactory { get; internal set; } = NullLoggerFactory.Instance;
+ public IScheduler ClientScheduler { get; internal set; } = TaskPoolScheduler.Default;
+ public IScheduler ServerScheduler { get; internal set; } = TaskPoolScheduler.Default;
public TimeSpan WaitTime { get; internal set; } = TimeSpan.FromMilliseconds(50);
public TimeSpan Timeout { get; internal set; } = TimeSpan.FromMilliseconds(500);
public TimeSpan CancellationTimeout { get; internal set; } = TimeSpan.FromSeconds(50);
diff --git a/src/JsonRpc.Testing/JsonRpcTestOptionsExtensions.cs b/src/JsonRpc.Testing/JsonRpcTestOptionsExtensions.cs
index 1fa284b70..b367ff631 100644
--- a/src/JsonRpc.Testing/JsonRpcTestOptionsExtensions.cs
+++ b/src/JsonRpc.Testing/JsonRpcTestOptionsExtensions.cs
@@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Linq;
+using System.Reactive.Concurrency;
using System.Reflection;
using Microsoft.Extensions.Logging;
@@ -21,6 +22,30 @@ public static JsonRpcTestOptions WithClientLoggerFactory(this JsonRpcTestOptions
return options;
}
+ public static JsonRpcTestOptions WithLoggerFactory(this JsonRpcTestOptions options, ILoggerFactory loggerFactory)
+ {
+ options.ClientLoggerFactory = options.ServerLoggerFactory = loggerFactory;
+ return options;
+ }
+
+ public static JsonRpcTestOptions WithServerScheduler(this JsonRpcTestOptions options, IScheduler scheduler)
+ {
+ options.ServerScheduler = scheduler;
+ return options;
+ }
+
+ public static JsonRpcTestOptions WithClientScheduler(this JsonRpcTestOptions options, IScheduler scheduler)
+ {
+ options.ClientScheduler = scheduler;
+ return options;
+ }
+
+ public static JsonRpcTestOptions WithScheduler(this JsonRpcTestOptions options, IScheduler scheduler)
+ {
+ options.ClientScheduler = options.ServerScheduler = scheduler;
+ return options;
+ }
+
public static JsonRpcTestOptions WithWaitTime(this JsonRpcTestOptions options, TimeSpan waitTime)
{
options.WaitTime = waitTime;
diff --git a/src/JsonRpc.Testing/Settler.cs b/src/JsonRpc.Testing/Settler.cs
index 2fc758d70..f25ee60e3 100644
--- a/src/JsonRpc.Testing/Settler.cs
+++ b/src/JsonRpc.Testing/Settler.cs
@@ -60,7 +60,7 @@ public Settler(JsonRpcTestOptions options, CancellationToken cancellationToken,
_requester = subject.AsObserver();
}
- public Task SettleNext() => SettleNextInternal().ToTask(_cancellationToken);
+ public Task SettleNext() => SettleNextInternal().ToTask(_cancellationToken, _scheduler);
public IObservable SettleNextInternal() => _settle
.Catch(_ => _timeoutValue)
diff --git a/src/JsonRpc/Client/OutgoingNotification.cs b/src/JsonRpc/Client/OutgoingNotification.cs
index 750249a02..a1480ae91 100644
--- a/src/JsonRpc/Client/OutgoingNotification.cs
+++ b/src/JsonRpc/Client/OutgoingNotification.cs
@@ -1,6 +1,6 @@
namespace OmniSharp.Extensions.JsonRpc.Client
{
- public class OutgoingNotification
+ public record OutgoingNotification
{
public string Method { get; set; } = null!;
diff --git a/src/JsonRpc/Client/OutgoingRequest.cs b/src/JsonRpc/Client/OutgoingRequest.cs
index 6252de1aa..787e00591 100644
--- a/src/JsonRpc/Client/OutgoingRequest.cs
+++ b/src/JsonRpc/Client/OutgoingRequest.cs
@@ -2,7 +2,7 @@
namespace OmniSharp.Extensions.JsonRpc.Client
{
- public class OutgoingRequest
+ public record OutgoingRequest
{
public object? Id { get; set; }
diff --git a/src/JsonRpc/Client/OutgoingResponse.cs b/src/JsonRpc/Client/OutgoingResponse.cs
index 0b4dfa3fe..582e53c58 100644
--- a/src/JsonRpc/Client/OutgoingResponse.cs
+++ b/src/JsonRpc/Client/OutgoingResponse.cs
@@ -2,7 +2,7 @@
namespace OmniSharp.Extensions.JsonRpc.Client
{
- public class OutgoingResponse
+ public record OutgoingResponse
{
public OutgoingResponse(object id, ServerRequest request)
{
diff --git a/src/JsonRpc/JsonRpc.csproj b/src/JsonRpc/JsonRpc.csproj
index 81d2c2328..084fab058 100644
--- a/src/JsonRpc/JsonRpc.csproj
+++ b/src/JsonRpc/JsonRpc.csproj
@@ -7,16 +7,17 @@
Primitives for working with JsonRpc. This library is used as the base for communication with language servers
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/JsonRpc/JsonRpcServerOptionsBase.cs b/src/JsonRpc/JsonRpcServerOptionsBase.cs
index 6a513ba3e..ad3d787fe 100644
--- a/src/JsonRpc/JsonRpcServerOptionsBase.cs
+++ b/src/JsonRpc/JsonRpcServerOptionsBase.cs
@@ -37,7 +37,8 @@ public ILoggerFactory LoggerFactory
public IRequestProcessIdentifier? RequestProcessIdentifier { get; set; }
public int? Concurrency { get; set; }
public IScheduler InputScheduler { get; set; } = TaskPoolScheduler.Default;
- public IScheduler OutputScheduler { get; set; } = TaskPoolScheduler.Default;
+ public IScheduler OutputScheduler { get; set; } = Scheduler.Immediate;
+ public IScheduler DefaultScheduler { get; set; } = TaskPoolScheduler.Default;
public CreateResponseExceptionHandler? CreateResponseException { get; set; }
public OnUnhandledExceptionHandler? OnUnhandledException { get; set; }
public bool SupportsContentModified { get; set; } = true;
diff --git a/src/JsonRpc/JsonRpcServerOptionsExtensions.cs b/src/JsonRpc/JsonRpcServerOptionsExtensions.cs
index b2a821b75..6c341cc7a 100644
--- a/src/JsonRpc/JsonRpcServerOptionsExtensions.cs
+++ b/src/JsonRpc/JsonRpcServerOptionsExtensions.cs
@@ -24,7 +24,7 @@ public static JsonRpcServerOptions WithAssemblyAttributeScanning(this JsonRpcSer
///
public static JsonRpcServerOptions WithScheduler(this JsonRpcServerOptions options, IScheduler inputScheduler)
{
- options.InputScheduler = options.OutputScheduler = inputScheduler;
+ options.InputScheduler = options.OutputScheduler = options.DefaultScheduler = inputScheduler;
return options;
}
@@ -40,6 +40,18 @@ public static JsonRpcServerOptions WithInputScheduler(this JsonRpcServerOptions
return options;
}
+ ///
+ /// Sets the default scheduler to be used when scheduling other tasks
+ ///
+ ///
+ ///
+ ///
+ public static JsonRpcServerOptions WithDefaultScheduler(this JsonRpcServerOptions options, IScheduler defaultScheduler)
+ {
+ options.DefaultScheduler = defaultScheduler;
+ return options;
+ }
+
///
/// Sets the scheduler use during writing output
///
diff --git a/src/JsonRpc/JsonRpcServerServiceCollectionExtensions.cs b/src/JsonRpc/JsonRpcServerServiceCollectionExtensions.cs
index 35538dfab..1859fd078 100644
--- a/src/JsonRpc/JsonRpcServerServiceCollectionExtensions.cs
+++ b/src/JsonRpc/JsonRpcServerServiceCollectionExtensions.cs
@@ -58,6 +58,8 @@ internal static IContainer AddJsonRpcServerCore(this IContainer container, Js
reuse: Reuse.Singleton
);
+ container.RegisterInstance(options.DefaultScheduler);
+
container.RegisterMany(
serviceTypeCondition: type => type.IsInterface,
reuse: Reuse.Singleton
diff --git a/src/JsonRpc/OutputHandler.cs b/src/JsonRpc/OutputHandler.cs
index 2b167bdc9..ef852869e 100644
--- a/src/JsonRpc/OutputHandler.cs
+++ b/src/JsonRpc/OutputHandler.cs
@@ -3,12 +3,14 @@
using System.ComponentModel;
using System.IO.Pipelines;
using System.Linq;
+using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
+using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
@@ -20,11 +22,16 @@ public class OutputHandler : IOutputHandler
private readonly ISerializer _serializer;
private readonly IEnumerable _outputFilters;
private readonly ILogger _logger;
- private readonly Subject