Skip to content

Commit 9003a11

Browse files
Ensure that progress observable task is created before the handler method is called to avoid a race condition where no value is picked up potentially
1 parent 81b402f commit 9003a11

File tree

6 files changed

+191
-107
lines changed

6 files changed

+191
-107
lines changed

src/Protocol/AbstractHandlers.cs

+49-27
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,14 @@ CancellationToken cancellationToken
141141
}
142142

143143
var subject = new AsyncSubject<TItem?>();
144+
var task = subject
145+
.Select(_factory)
146+
.ToTask(cancellationToken, _progressManager.Scheduler)
147+
.ConfigureAwait(false);
144148
// in the event nothing is emitted...
145149
subject.OnNext(default!);
146150
Handle(request, subject, cancellationToken);
147-
return _factory(await subject);
151+
return await task;
148152
}
149153

150154
protected abstract void Handle(TParams request, IObserver<TItem> results, CancellationToken cancellationToken);
@@ -181,10 +185,14 @@ CancellationToken cancellationToken
181185
}
182186

183187
var subject = new AsyncSubject<TItem?>();
188+
var task = subject
189+
.Select(_factory)
190+
.ToTask(cancellationToken, _progressManager.Scheduler)
191+
.ConfigureAwait(false);
184192
// in the event nothing is emitted...
185193
subject.OnNext(default!);
186194
Handle(request, subject, cancellationToken);
187-
return _factory(await subject);
195+
return await task;
188196
}
189197

190198
protected abstract void Handle(TParams request, IObserver<TItem> results, CancellationToken cancellationToken);
@@ -221,10 +229,14 @@ CancellationToken cancellationToken
221229
}
222230

223231
var subject = new AsyncSubject<TItem>();
232+
var task = subject
233+
.Select(_factory)
234+
.ToTask(cancellationToken, _progressManager.Scheduler)
235+
.ConfigureAwait(false);
224236
// in the event nothing is emitted...
225237
subject.OnNext(default!);
226238
Handle(request, subject, cancellationToken);
227-
return _factory(await subject);
239+
return await task;
228240
}
229241

230242
protected abstract void Handle(TParams request, IObserver<TItem> results, CancellationToken cancellationToken);
@@ -258,15 +270,18 @@ protected PartialResults(IProgressManager progressManager, Func<IEnumerable<TIte
258270
}
259271

260272
var subject = new Subject<IEnumerable<TItem>>();
261-
var task = subject.Aggregate(
262-
new List<TItem>(), (acc, items) => {
263-
acc.AddRange(items);
264-
return acc;
265-
}
266-
)
267-
.ToTask(cancellationToken, _progressManager.Scheduler);
273+
var task = subject
274+
.Aggregate(
275+
new List<TItem>(), (acc, items) => {
276+
acc.AddRange(items);
277+
return acc;
278+
}
279+
)
280+
.Select(_factory)
281+
.ToTask(cancellationToken, _progressManager.Scheduler)
282+
.ConfigureAwait(false);
268283
Handle(request, subject, cancellationToken);
269-
return _factory(await task.ConfigureAwait(false));
284+
return await task;
270285
}
271286

272287
protected abstract void Handle(TParams request, IObserver<IEnumerable<TItem>> results, CancellationToken cancellationToken);
@@ -299,19 +314,23 @@ protected PartialResults(IProgressManager progressManager, Func<IEnumerable<TIte
299314
}
300315

301316
var subject = new Subject<IEnumerable<TItem>>();
302-
var task = subject.Aggregate(
303-
new List<TItem>(), (acc, items) => {
304-
acc.AddRange(items);
305-
return acc;
306-
}
307-
)
308-
.ToTask(cancellationToken, _progressManager.Scheduler);
317+
var task = subject
318+
.Aggregate(
319+
new List<TItem>(), (acc, items) => {
320+
acc.AddRange(items);
321+
return acc;
322+
}
323+
)
324+
.Select(_factory)
325+
.ToTask(cancellationToken, _progressManager.Scheduler)
326+
.ConfigureAwait(false);
309327
Handle(request, subject, cancellationToken);
310-
return _factory(await task.ConfigureAwait(false));
328+
return await task;
311329
}
312330

313331
protected abstract void Handle(TParams request, IObserver<IEnumerable<TItem>> results, CancellationToken cancellationToken);
314332
}
333+
315334
public abstract class PartialResultsCapability<TParams, TResponse, TItem, TCapability> :
316335
BaseCapability<TCapability>,
317336
IJsonRpcRequestHandler<TParams, TResponse?>
@@ -339,15 +358,18 @@ protected PartialResultsCapability(IProgressManager progressManager, Func<IEnume
339358
}
340359

341360
var subject = new Subject<IEnumerable<TItem>>();
342-
var task = subject.Aggregate(
343-
new List<TItem>(), (acc, items) => {
344-
acc.AddRange(items);
345-
return acc;
346-
}
347-
)
348-
.ToTask(cancellationToken, _progressManager.Scheduler);
361+
var task = subject
362+
.Aggregate(
363+
new List<TItem>(), (acc, items) => {
364+
acc.AddRange(items);
365+
return acc;
366+
}
367+
)
368+
.Select(_factory)
369+
.ToTask(cancellationToken, _progressManager.Scheduler)
370+
.ConfigureAwait(false);
349371
Handle(request, subject, cancellationToken);
350-
return _factory(await task.ConfigureAwait(false));
372+
return await task;
351373
}
352374

353375
protected abstract void Handle(TParams request, IObserver<IEnumerable<TItem>> results, CancellationToken cancellationToken);

0 commit comments

Comments
 (0)