Skip to content

Commit e0cc201

Browse files
Fixed how partial items are scheduled (#565)
* Fixed issues with scheduling around partial items and results. This will fix flakey tests, as well as possible edge cases where partial items are consumed in rapid succession * Ensure that progress observable task is created before the handler method is called to avoid a race condition where no value is picked up potentially * register UnregisterCapability disposable earlier in the process * change how completion works * overhaul the outputhandler
1 parent 917c998 commit e0cc201

File tree

96 files changed

+1244
-666
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

96 files changed

+1244
-666
lines changed

Directory.Build.targets

+53-52
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,53 @@
1-
<?xml version="1.0" encoding="utf-8"?>
2-
<Project>
3-
<PropertyGroup>
4-
<CompilerGeneratedFilesOutputPath>$(BaseIntermediateOutputPath)\GeneratedFiles</CompilerGeneratedFilesOutputPath>
5-
</PropertyGroup>
6-
<ItemGroup>
7-
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
8-
<PackageReference Include="Rocket.Surgery.MSBuild.CI" Version="1.1.0" PrivateAssets="All" />
9-
<PackageReference Include="Rocket.Surgery.MSBuild.Metadata" Version="1.1.0" PrivateAssets="All" />
10-
<PackageReference Include="Rocket.Surgery.MSBuild.SourceLink" Version="1.1.0" PrivateAssets="All" />
11-
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.0" PrivateAssets="All" />
12-
</ItemGroup>
13-
<ItemGroup>
14-
<PackageReference Update="GitVersion.Tool" Version="5.6.6" />
15-
<PackageReference Update="JetBrains.ReSharper.CommandLineTools" Version="2020.3.3" />
16-
<PackageReference Update="ReportGenerator" Version="4.8.6" />
17-
<PackageReference Update="Rocket.Surgery.Nuke" Version="0.14.3" />
18-
</ItemGroup>
19-
<ItemGroup>
20-
<PackageReference Update="Microsoft.Extensions.Logging" Version="2.0.0" />
21-
<PackageReference Update="Microsoft.Extensions.Logging.Debug" Version="2.0.0" />
22-
<PackageReference Update="Microsoft.Extensions.Configuration" Version="2.0.0" />
23-
<PackageReference Update="Microsoft.Extensions.Configuration.Binder" Version="2.0.0" />
24-
<PackageReference Update="Microsoft.Extensions.Options" Version="2.0.0" />
25-
<PackageReference Update="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.0.0" />
26-
<PackageReference Update="Microsoft.Extensions.DependencyInjection" Version="2.0.0" />
27-
<PackageReference Update="Newtonsoft.Json" Version="11.0.2" />
28-
<PackageReference Update="Microsoft.NET.Test.Sdk" Version="16.9.1" />
29-
<PackageReference Update="xunit.runner.visualstudio" Version="2.4.3" />
30-
<PackageReference Update="xunit" Version="2.4.1" />
31-
<PackageReference Update="FluentAssertions" Version="5.10.3" />
32-
<PackageReference Update="NSubstitute" Version="4.2.2" />
33-
<PackageReference Update="Serilog.Extensions.Logging" Version="2.0.2" />
34-
<PackageReference Update="Serilog.Sinks.Observable" Version="2.0.2" />
35-
<PackageReference Update="Serilog.Sinks.XUnit" Version="2.0.4" />
36-
<PackageReference Update="XunitXml.TestLogger" Version="3.0.62" />
37-
<PackageReference Update="coverlet.collector" Version="3.0.3" />
38-
<PackageReference Update="coverlet.msbuild" Version="3.0.3" />
39-
<PackageReference Update="System.Reactive" Version="4.4.1" />
40-
<PackageReference Update="System.Collections.Immutable" Version="1.7.1" />
41-
<PackageReference Update="Microsoft.Reactive.Testing" Version="4.4.1" />
42-
<PackageReference Update="MediatR" Version="8.1.0" />
43-
<PackageReference Update="Bogus" Version="33.0.2" />
44-
<PackageReference Update="Snapper" Version="2.3.0" />
45-
<PackageReference Update="Xunit.SkippableFact" Version="1.4.13" />
46-
<PackageReference Update="System.IO.Pipelines" Version="4.7.3" />
47-
<PackageReference Update="Nerdbank.Streams" Version="2.6.81" />
48-
<PackageReference Update="Microsoft.CodeAnalysis.CSharp.Workspaces" Version="3.8.0" />
49-
<PackageReference Update="Microsoft.CodeAnalysis.Analyzers" Version="3.3.2" />
50-
<PackageReference Update="DryIoc.Internal" Version="4.7.3" />
51-
</ItemGroup>
52-
</Project>
1+
<?xml version="1.0" encoding="utf-8"?>
2+
<Project>
3+
<PropertyGroup>
4+
<CompilerGeneratedFilesOutputPath>$(BaseIntermediateOutputPath)\GeneratedFiles</CompilerGeneratedFilesOutputPath>
5+
</PropertyGroup>
6+
<ItemGroup>
7+
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
8+
<PackageReference Include="Rocket.Surgery.MSBuild.CI" Version="1.1.0" PrivateAssets="All"/>
9+
<PackageReference Include="Rocket.Surgery.MSBuild.Metadata" Version="1.1.0" PrivateAssets="All"/>
10+
<PackageReference Include="Rocket.Surgery.MSBuild.SourceLink" Version="1.1.0" PrivateAssets="All"/>
11+
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.0" PrivateAssets="All"/>
12+
</ItemGroup>
13+
<ItemGroup>
14+
<PackageReference Update="GitVersion.Tool" Version="5.6.6"/>
15+
<PackageReference Update="JetBrains.ReSharper.CommandLineTools" Version="2020.3.3"/>
16+
<PackageReference Update="ReportGenerator" Version="4.8.6"/>
17+
<PackageReference Update="Rocket.Surgery.Nuke" Version="0.14.3"/>
18+
</ItemGroup>
19+
<ItemGroup>
20+
<PackageReference Update="Microsoft.Extensions.Logging" Version="2.0.0"/>
21+
<PackageReference Update="Microsoft.Extensions.Logging.Debug" Version="2.0.0"/>
22+
<PackageReference Update="Microsoft.Extensions.Configuration" Version="2.0.0"/>
23+
<PackageReference Update="Microsoft.Extensions.Configuration.Binder" Version="2.0.0"/>
24+
<PackageReference Update="Microsoft.Extensions.Options" Version="2.0.0"/>
25+
<PackageReference Update="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.0.0"/>
26+
<PackageReference Update="Microsoft.Extensions.DependencyInjection" Version="2.0.0"/>
27+
<PackageReference Update="Newtonsoft.Json" Version="11.0.2"/>
28+
<PackageReference Update="Microsoft.NET.Test.Sdk" Version="16.9.1"/>
29+
<PackageReference Update="xunit.runner.visualstudio" Version="2.4.3"/>
30+
<PackageReference Update="xunit" Version="2.4.1"/>
31+
<PackageReference Update="FluentAssertions" Version="5.10.3"/>
32+
<PackageReference Update="NSubstitute" Version="4.2.2"/>
33+
<PackageReference Update="Serilog.Extensions.Logging" Version="2.0.2"/>
34+
<PackageReference Update="Serilog.Sinks.Observable" Version="2.0.2"/>
35+
<PackageReference Update="Serilog.Sinks.XUnit" Version="2.0.4"/>
36+
<PackageReference Update="XunitXml.TestLogger" Version="3.0.62"/>
37+
<PackageReference Update="coverlet.collector" Version="3.0.3"/>
38+
<PackageReference Update="coverlet.msbuild" Version="3.0.3"/>
39+
<PackageReference Update="System.Reactive" Version="4.4.1"/>
40+
<PackageReference Update="System.Collections.Immutable" Version="1.7.1"/>
41+
<PackageReference Update="System.Threading.Channels" Version="4.7.1"/>
42+
<PackageReference Update="Microsoft.Reactive.Testing" Version="4.4.1"/>
43+
<PackageReference Update="MediatR" Version="8.1.0"/>
44+
<PackageReference Update="Bogus" Version="33.0.2"/>
45+
<PackageReference Update="Snapper" Version="2.3.0"/>
46+
<PackageReference Update="Xunit.SkippableFact" Version="1.4.13"/>
47+
<PackageReference Update="System.IO.Pipelines" Version="4.7.3"/>
48+
<PackageReference Update="Nerdbank.Streams" Version="2.6.81"/>
49+
<PackageReference Update="Microsoft.CodeAnalysis.CSharp.Workspaces" Version="3.8.0"/>
50+
<PackageReference Update="Microsoft.CodeAnalysis.Analyzers" Version="3.3.2"/>
51+
<PackageReference Update="DryIoc.Internal" Version="4.7.3"/>
52+
</ItemGroup>
53+
</Project>

src/Client/LanguageClient.cs

+9-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Generic;
33
using System.Diagnostics.CodeAnalysis;
44
using System.Linq;
5+
using System.Reactive.Concurrency;
56
using System.Reactive.Disposables;
67
using System.Reactive.Linq;
78
using System.Reactive.Subjects;
@@ -48,6 +49,7 @@ public class LanguageClient : JsonRpcServerBase, ILanguageClient
4849
private readonly IEnumerable<IOnLanguageClientInitialized> _initializedHandlers;
4950
private readonly LspSerializer _serializer;
5051
private readonly InstanceHasStarted _instanceHasStarted;
52+
private readonly IScheduler _scheduler;
5153
private readonly IResponseRouter _responseRouter;
5254
private readonly ISubject<InitializeResult> _initializeComplete = new AsyncSubject<InitializeResult>();
5355
private readonly CompositeDisposable _disposable = new CompositeDisposable();
@@ -150,7 +152,8 @@ internal LanguageClient(
150152
IEnumerable<OnLanguageClientInitializedDelegate> initializedDelegates,
151153
IEnumerable<IOnLanguageClientInitialized> initializedHandlers,
152154
LspSerializer serializer,
153-
InstanceHasStarted instanceHasStarted
155+
InstanceHasStarted instanceHasStarted,
156+
IScheduler scheduler
154157
) : base(handlerCollection, responseRouter)
155158
{
156159
_connection = connection;
@@ -179,6 +182,7 @@ InstanceHasStarted instanceHasStarted
179182
_initializedHandlers = initializedHandlers;
180183
_serializer = serializer;
181184
_instanceHasStarted = instanceHasStarted;
185+
_scheduler = scheduler;
182186
_concurrency = options.Value.Concurrency;
183187

184188
// We need to at least create Window here in case any handler does loggin in their constructor
@@ -262,6 +266,7 @@ await LanguageProtocolEventingHelper.Run(
262266
_initializeHandlers.Union(_collection.Select(z => z.Handler).OfType<IOnLanguageClientInitialize>()),
263267
(handler, ct) => handler.OnInitialize(this, @params, ct),
264268
_concurrency,
269+
_scheduler,
265270
token
266271
).ConfigureAwait(false);
267272

@@ -281,6 +286,7 @@ await LanguageProtocolEventingHelper.Run(
281286
_initializedHandlers.Union(_collection.Select(z => z.Handler).OfType<IOnLanguageClientInitialized>()),
282287
(handler, ct) => handler.OnInitialized(this, @params, serverParams, ct),
283288
_concurrency,
289+
_scheduler,
284290
token
285291
).ConfigureAwait(false);
286292

@@ -299,6 +305,7 @@ await LanguageProtocolEventingHelper.Run(
299305
_startedHandlers.Union(_collection.Select(z => z.Handler).OfType<IOnLanguageClientStarted>()),
300306
(handler, ct) => handler.OnStarted(this, ct),
301307
_concurrency,
308+
_scheduler,
302309
token
303310
).ConfigureAwait(false);
304311

@@ -395,7 +402,7 @@ private Supports<T> UseOrTryAndFindCapability<T>(Supports<T> supports) where T :
395402
bool IResponseRouter.TryGetRequest(long id, [NotNullWhen(true)] out string method, [NotNullWhen(true)] out TaskCompletionSource<JToken> pendingTask) =>
396403
_responseRouter.TryGetRequest(id, out method, out pendingTask);
397404

398-
public Task<InitializeResult> WasStarted => _initializeComplete.ToTask();
405+
public Task<InitializeResult> WasStarted => _initializeComplete.ToTask(_scheduler);
399406

400407
public void Dispose()
401408
{

src/Client/LanguageClientOptionsExtensions.cs

+49
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Reactive.Concurrency;
23
using Microsoft.Extensions.Configuration;
34
using Microsoft.Extensions.DependencyInjection;
45
using Microsoft.Extensions.Logging;
@@ -87,6 +88,54 @@ public static LanguageClientOptions WithClientCapabilities(this LanguageClientOp
8788
return options;
8889
}
8990

91+
/// <summary>
92+
/// Sets both input and output schedulers to the same scheduler
93+
/// </summary>
94+
/// <param name="options"></param>
95+
/// <param name="inputScheduler"></param>
96+
/// <returns></returns>
97+
public static LanguageClientOptions WithScheduler(this LanguageClientOptions options, IScheduler inputScheduler)
98+
{
99+
options.InputScheduler = options.OutputScheduler = options.DefaultScheduler = inputScheduler;
100+
return options;
101+
}
102+
103+
/// <summary>
104+
/// Sets the scheduler used during reading input
105+
/// </summary>
106+
/// <param name="options"></param>
107+
/// <param name="inputScheduler"></param>
108+
/// <returns></returns>
109+
public static LanguageClientOptions WithInputScheduler(this LanguageClientOptions options, IScheduler inputScheduler)
110+
{
111+
options.InputScheduler = inputScheduler;
112+
return options;
113+
}
114+
115+
/// <summary>
116+
/// Sets the default scheduler to be used when scheduling other tasks
117+
/// </summary>
118+
/// <param name="options"></param>
119+
/// <param name="defaultScheduler"></param>
120+
/// <returns></returns>
121+
public static LanguageClientOptions WithDefaultScheduler(this LanguageClientOptions options, IScheduler defaultScheduler)
122+
{
123+
options.DefaultScheduler = defaultScheduler;
124+
return options;
125+
}
126+
127+
/// <summary>
128+
/// Sets the scheduler use during writing output
129+
/// </summary>
130+
/// <param name="options"></param>
131+
/// <param name="outputScheduler"></param>
132+
/// <returns></returns>
133+
public static LanguageClientOptions WithOutputScheduler(this LanguageClientOptions options, IScheduler outputScheduler)
134+
{
135+
options.OutputScheduler = outputScheduler;
136+
return options;
137+
}
138+
90139
public static LanguageClientOptions OnInitialize(this LanguageClientOptions options, OnLanguageClientInitializeDelegate @delegate)
91140
{
92141
options.Services.AddSingleton(@delegate);

src/Client/LanguageClientRegistrationManager.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Concurrent;
33
using System.Collections.Generic;
44
using System.Linq;
5+
using System.Reactive.Concurrency;
56
using System.Reactive.Linq;
67
using System.Reactive.Subjects;
78
using System.Threading;
@@ -26,7 +27,7 @@ internal class LanguageClientRegistrationManager : IRegisterCapabilityHandler, I
2627
private readonly ILspHandlerTypeDescriptorProvider _handlerTypeDescriptorProvider;
2728
private readonly ILogger<LanguageClientRegistrationManager> _logger;
2829
private readonly ConcurrentDictionary<string, Registration> _registrations;
29-
private readonly ReplaySubject<IEnumerable<Registration>> _registrationSubject = new ReplaySubject<IEnumerable<Registration>>(1);
30+
private readonly ReplaySubject<IEnumerable<Registration>> _registrationSubject = new ReplaySubject<IEnumerable<Registration>>(1, Scheduler.Immediate);
3031

3132
public LanguageClientRegistrationManager(
3233
ISerializer serializer,

src/Client/LanguageClientWorkspaceFoldersManager.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Concurrent;
33
using System.Collections.Generic;
44
using System.Linq;
5+
using System.Reactive.Concurrency;
56
using System.Reactive.Linq;
67
using System.Reactive.Subjects;
78
using System.Threading;
@@ -25,7 +26,7 @@ public LanguageClientWorkspaceFoldersManager(IWorkspaceLanguageClient client, IE
2526
{
2627
_client = client;
2728
_workspaceFolders = new ConcurrentDictionary<DocumentUri, WorkspaceFolder>(DocumentUri.Comparer);
28-
_workspaceFoldersSubject = new ReplaySubject<IEnumerable<WorkspaceFolder>>(1);
29+
_workspaceFoldersSubject = new ReplaySubject<IEnumerable<WorkspaceFolder>>(1, Scheduler.Immediate);
2930

3031
foreach (var folder in workspaceFolders)
3132
{

src/Dap.Client/DebugAdapterClient.cs

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4+
using System.Reactive.Concurrency;
45
using System.Reactive.Disposables;
56
using System.Reactive.Subjects;
67
using System.Reactive.Threading.Tasks;
@@ -30,6 +31,7 @@ public class DebugAdapterClient : JsonRpcServerBase, IDebugAdapterClient, IDebug
3031
private readonly IEnumerable<OnDebugAdapterClientStartedDelegate> _startedDelegates;
3132
private readonly IEnumerable<IOnDebugAdapterClientStarted> _startedHandlers;
3233
private readonly InstanceHasStarted _instanceHasStarted;
34+
private readonly IScheduler _scheduler;
3335
private readonly CompositeDisposable _disposable = new CompositeDisposable();
3436
private readonly Connection _connection;
3537
private readonly DapReceiver _receiver;
@@ -97,7 +99,8 @@ internal DebugAdapterClient(
9799
IEnumerable<OnDebugAdapterClientInitializedDelegate> initializedDelegates,
98100
IEnumerable<IOnDebugAdapterClientInitialized> initializedHandlers,
99101
IEnumerable<IOnDebugAdapterClientStarted> startedHandlers,
100-
InstanceHasStarted instanceHasStarted
102+
InstanceHasStarted instanceHasStarted,
103+
IScheduler scheduler
101104
) : base(collection, responseRouter)
102105
{
103106
_settingsBag = settingsBag;
@@ -114,6 +117,7 @@ InstanceHasStarted instanceHasStarted
114117
_initializedHandlers = initializedHandlers;
115118
_startedHandlers = startedHandlers;
116119
_instanceHasStarted = instanceHasStarted;
120+
_scheduler = scheduler;
117121
_concurrency = options.Value.Concurrency;
118122

119123
_disposable.Add(collection.Add(this));
@@ -127,6 +131,7 @@ await DebugAdapterEventingHelper.Run(
127131
_initializeHandlers.Union(_collection.Select(z => z.Handler).OfType<IOnDebugAdapterClientInitialize>()),
128132
(handler, ct) => handler.OnInitialize(this, ClientSettings, ct),
129133
_concurrency,
134+
_scheduler,
130135
token
131136
).ConfigureAwait(false);
132137

@@ -145,17 +150,19 @@ await DebugAdapterEventingHelper.Run(
145150
_initializedHandlers.Union(_collection.Select(z => z.Handler).OfType<IOnDebugAdapterClientInitialized>()),
146151
(handler, ct) => handler.OnInitialized(this, ClientSettings, ServerSettings, ct),
147152
_concurrency,
153+
_scheduler,
148154
token
149155
).ConfigureAwait(false);
150156

151-
await _initializedComplete.ToTask(token);
157+
await _initializedComplete.ToTask(token, _scheduler);
152158

153159
await DebugAdapterEventingHelper.Run(
154160
_startedDelegates,
155161
(handler, ct) => handler(this, ct),
156162
_startedHandlers.Union(_collection.Select(z => z.Handler).OfType<IOnDebugAdapterClientStarted>()),
157163
(handler, ct) => handler.OnStarted(this, ct),
158164
_concurrency,
165+
_scheduler,
159166
token
160167
).ConfigureAwait(false);
161168

0 commit comments

Comments
 (0)