Skip to content

Commit a361160

Browse files
author
N. Taylor Mullen
authored
Refactor request invocation to be extensible. (#641)
* Refactor request invocation to be extensible. - The initial motivation for this change spawned from an investigation in the Razor language server where transitioning from Parallel -> Serial tasks would wait on Parallel work to finish. In scenarios when that parallel work took a long time this could result in significant editor delays in completion where you'd have Parallel (long) -> Serial -> Parallel (completion, short). I played around with changing the System.Reactive bits to have an option to not "wait" for the parallel stacks but System.Reatcive as a library wasn't truly built to handle that type of "change your mind after-the-fact" flow. - Prior to this change the routing & scheduling aspects of the JsonRpc stack are bound to our ProcessScheduler & InputHandler.RouteRequest & InputHandler.RouteNotification endspoints. This change allows that entire stack to be extensible so consumers can plug & play. - Added a `RequestInvoker` type which represents the core logic of how the framework invokes a handler for a request. This encapsulates the control flow for invoking, scheduling and handling fallout from invoking a handler. - Added a `RequestInvokerOptions` type to represent what sort of settings should be applied for the request invoker paradigm. - Expanded `InputHandler` & `Connection` to have two new constructors that take in a request invoker and obsoleted the old ones. Updated tests to account for this. - Registered the default request invoker type (the one that uses System.Reactive) if a request invoker was not already registered. * Make existing request response types fully public - For consumers who are creating their own `RequestInvoker` they need to manually construct many of our response types. Therefore, the constructors need to also be puclic. * Fix test. * Addressed code review comments
1 parent de1591e commit a361160

12 files changed

+394
-199
lines changed

src/JsonRpc/Connection.cs

+35-6
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ public class Connection : IDisposable
1111
private readonly InputHandler _inputHandler;
1212
public bool IsOpen { get; private set; }
1313

14+
[Obsolete("Use the other constructor that takes a request invoker")]
1415
public Connection(
1516
PipeReader input,
1617
IOutputHandler outputHandler,
@@ -25,21 +26,49 @@ public Connection(
2526
int concurrency,
2627
IScheduler scheduler,
2728
CreateResponseExceptionHandler? getException = null
29+
) : this(
30+
input,
31+
outputHandler,
32+
receiver,
33+
requestRouter,
34+
responseRouter,
35+
new DefaultRequestInvoker(
36+
requestRouter,
37+
outputHandler,
38+
requestProcessIdentifier,
39+
new RequestInvokerOptions(
40+
requestTimeout,
41+
supportContentModified,
42+
concurrency),
43+
loggerFactory,
44+
scheduler),
45+
loggerFactory,
46+
onUnhandledException,
47+
getException)
48+
{
49+
}
50+
51+
public Connection(
52+
PipeReader input,
53+
IOutputHandler outputHandler,
54+
IReceiver receiver,
55+
IRequestRouter<IHandlerDescriptor?> requestRouter,
56+
IResponseRouter responseRouter,
57+
RequestInvoker requestInvoker,
58+
ILoggerFactory loggerFactory,
59+
OnUnhandledExceptionHandler onUnhandledException,
60+
CreateResponseExceptionHandler? getException = null
2861
) =>
2962
_inputHandler = new InputHandler(
3063
input,
3164
outputHandler,
3265
receiver,
33-
requestProcessIdentifier,
3466
requestRouter,
3567
responseRouter,
68+
requestInvoker,
3669
loggerFactory,
3770
onUnhandledException,
38-
getException,
39-
requestTimeout,
40-
supportContentModified,
41-
concurrency > 1 ? (int?) concurrency : null,
42-
scheduler
71+
getException
4372
);
4473

4574
public void Open()

src/JsonRpc/ContentModified.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
using OmniSharp.Extensions.JsonRpc.Server;
1+
using OmniSharp.Extensions.JsonRpc.Server;
22
using OmniSharp.Extensions.JsonRpc.Server.Messages;
33

44
namespace OmniSharp.Extensions.JsonRpc
55
{
66
public class ContentModified : RpcError
77
{
8-
internal ContentModified(string method) : base(null, method, new ErrorMessage(ErrorCodes.ContentModified, "Content Modified"))
8+
public ContentModified(string method) : base(null, method, new ErrorMessage(ErrorCodes.ContentModified, "Content Modified"))
99
{
1010
}
1111

12-
internal ContentModified(object id, string method) : base(id, method, new ErrorMessage(ErrorCodes.ContentModified, "Content Modified"))
12+
public ContentModified(object id, string method) : base(id, method, new ErrorMessage(ErrorCodes.ContentModified, "Content Modified"))
1313
{
1414
}
1515
}

src/JsonRpc/DefaultRequestInvoker.cs

+181
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
using System;
2+
using System.Reactive;
3+
using System.Reactive.Concurrency;
4+
using System.Reactive.Disposables;
5+
using System.Reactive.Linq;
6+
using Microsoft.Extensions.Logging;
7+
using OmniSharp.Extensions.JsonRpc.Server;
8+
using OmniSharp.Extensions.JsonRpc.Server.Messages;
9+
using Notification = OmniSharp.Extensions.JsonRpc.Server.Notification;
10+
11+
namespace OmniSharp.Extensions.JsonRpc
12+
{
13+
public class DefaultRequestInvoker : RequestInvoker
14+
{
15+
private readonly IRequestRouter<IHandlerDescriptor?> _requestRouter;
16+
private readonly IOutputHandler _outputHandler;
17+
private readonly ProcessScheduler _processScheduler;
18+
private readonly IRequestProcessIdentifier _requestProcessIdentifier;
19+
private readonly RequestInvokerOptions _options;
20+
private readonly ILogger<DefaultRequestInvoker> _logger;
21+
22+
public DefaultRequestInvoker(
23+
IRequestRouter<IHandlerDescriptor?> requestRouter,
24+
IOutputHandler outputHandler,
25+
IRequestProcessIdentifier requestProcessIdentifier,
26+
RequestInvokerOptions options,
27+
ILoggerFactory loggerFactory,
28+
IScheduler scheduler)
29+
{
30+
_requestRouter = requestRouter;
31+
_outputHandler = outputHandler;
32+
_requestProcessIdentifier = requestProcessIdentifier;
33+
_options = options;
34+
_processScheduler = new ProcessScheduler(loggerFactory, _options.SupportContentModified, _options.Concurrency, scheduler);
35+
_logger = loggerFactory.CreateLogger<DefaultRequestInvoker>();
36+
}
37+
38+
public override RequestInvocationHandle InvokeRequest(IRequestDescriptor<IHandlerDescriptor?> descriptor, Request request)
39+
{
40+
if (descriptor.Default is null)
41+
{
42+
throw new ArgumentNullException(nameof(descriptor.Default));
43+
}
44+
45+
var handle = new RequestInvocationHandle(request);
46+
var type = _requestProcessIdentifier.Identify(descriptor.Default);
47+
48+
var schedulerDelegate = RouteRequest(descriptor, request, handle);
49+
_processScheduler.Add(type, $"{request.Method}:{request.Id}", schedulerDelegate);
50+
51+
return handle;
52+
}
53+
54+
public override void InvokeNotification(IRequestDescriptor<IHandlerDescriptor?> descriptor, Notification notification)
55+
{
56+
if (descriptor.Default is null)
57+
{
58+
throw new ArgumentNullException(nameof(descriptor.Default));
59+
}
60+
61+
var type = _requestProcessIdentifier.Identify(descriptor.Default);
62+
var schedulerDelegate = RouteNotification(descriptor, notification);
63+
_processScheduler.Add(type, notification.Method, schedulerDelegate);
64+
}
65+
66+
public override void Dispose()
67+
{
68+
_processScheduler.Dispose();
69+
}
70+
71+
private SchedulerDelegate RouteRequest(
72+
IRequestDescriptor<IHandlerDescriptor?> descriptor,
73+
Request request,
74+
RequestInvocationHandle handle)
75+
{
76+
var cts = handle.CancellationTokenSource;
77+
return (contentModifiedToken, scheduler) =>
78+
Observable.Create<ErrorResponse>(
79+
observer => {
80+
// ITS A RACE!
81+
var sub = Observable.Amb(
82+
contentModifiedToken.Select(
83+
_ => {
84+
_logger.LogTrace(
85+
"Request {Id} was abandoned due to content be modified", request.Id
86+
);
87+
return new ErrorResponse(
88+
new ContentModified(request.Id, request.Method)
89+
);
90+
}
91+
),
92+
Observable.Timer(_options.RequestTimeout, scheduler).Select(
93+
_ => new ErrorResponse(new RequestCancelled(request.Id, request.Method))
94+
),
95+
Observable.FromAsync(
96+
async ct => {
97+
using var timer = _logger.TimeDebug(
98+
"Processing request {Method} {ResponseId}", request.Method,
99+
request.Id
100+
);
101+
ct.Register(cts.Cancel);
102+
// ObservableToToken(contentModifiedToken).Register(cts.Cancel);
103+
try
104+
{
105+
var result = await _requestRouter.RouteRequest(
106+
descriptor, request, cts.Token
107+
).ConfigureAwait(false);
108+
return result;
109+
}
110+
catch (OperationCanceledException)
111+
{
112+
_logger.LogTrace("Request {Id} was cancelled", request.Id);
113+
return new RequestCancelled(request.Id, request.Method);
114+
}
115+
catch (RpcErrorException e)
116+
{
117+
_logger.LogCritical(
118+
Events.UnhandledRequest, e,
119+
"Failed to handle request {Method} {RequestId}", request.Method,
120+
request.Id
121+
);
122+
return new RpcError(
123+
request.Id, request.Method,
124+
new ErrorMessage(e.Code, e.Message, e.Error)
125+
);
126+
}
127+
catch (Exception e)
128+
{
129+
_logger.LogCritical(
130+
Events.UnhandledRequest, e,
131+
"Failed to handle request {Method} {RequestId}", request.Method,
132+
request.Id
133+
);
134+
return new InternalError(request.Id, request.Method, e.ToString());
135+
}
136+
}
137+
)
138+
)
139+
.Subscribe(observer);
140+
return new CompositeDisposable(sub, handle);
141+
}
142+
)
143+
.Select(
144+
response => {
145+
_outputHandler.Send(response.Value);
146+
return Unit.Default;
147+
}
148+
);
149+
}
150+
151+
private SchedulerDelegate RouteNotification(
152+
IRequestDescriptor<IHandlerDescriptor?> descriptors,
153+
Notification notification) =>
154+
(_, scheduler) =>
155+
// ITS A RACE!
156+
Observable.Amb(
157+
Observable.Timer(_options.RequestTimeout, scheduler)
158+
.Select(_ => Unit.Default)
159+
.Do(
160+
_ => _logger.LogTrace("Notification was cancelled due to timeout")
161+
),
162+
Observable.FromAsync(
163+
async ct => {
164+
using var timer = _logger.TimeDebug("Processing notification {Method}", notification.Method);
165+
try
166+
{
167+
await _requestRouter.RouteNotification(descriptors, notification, ct).ConfigureAwait(false);
168+
}
169+
catch (OperationCanceledException)
170+
{
171+
_logger.LogTrace("Notification was cancelled");
172+
}
173+
catch (Exception e)
174+
{
175+
_logger.LogCritical(Events.UnhandledRequest, e, "Failed to handle request {Method}", notification.Method);
176+
}
177+
}
178+
)
179+
);
180+
}
181+
}

0 commit comments

Comments
 (0)