Skip to content

Commit 48dac7b

Browse files
committed
Add EnsureCompleted task extensions.
1 parent f70513d commit 48dac7b

File tree

6 files changed

+62
-48
lines changed

6 files changed

+62
-48
lines changed

projects/RabbitMQ.Client/client/TaskExtensions.cs

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -87,40 +87,73 @@ static async Task DoTimeoutAfter(Task task, TimeSpan timeout)
8787
#endif
8888
}
8989

90-
public static async ValueTask TimeoutAfter(this ValueTask task, TimeSpan timeout)
90+
public static async ValueTask TimeoutAfter(this ValueTask valueTask, TimeSpan timeout)
9191
{
92-
if (task.IsCompletedSuccessfully)
92+
if (valueTask.IsCompletedSuccessfully)
9393
{
9494
return;
9595
}
9696

9797
#if NET6_0_OR_GREATER
98-
Task actualTask = task.AsTask();
99-
await actualTask.WaitAsync(timeout)
98+
Task task = valueTask.AsTask();
99+
await task.WaitAsync(timeout)
100100
.ConfigureAwait(false);
101101
#else
102-
await DoTimeoutAfter(task, timeout)
102+
await DoTimeoutAfter(valueTask, timeout)
103103
.ConfigureAwait(false);
104104

105-
async static ValueTask DoTimeoutAfter(ValueTask task, TimeSpan timeout)
105+
// https://github.com/davidfowl/AspNetCoreDiagnosticScenarios/blob/master/AsyncGuidance.md#using-a-timeout
106+
static async ValueTask DoTimeoutAfter(ValueTask valueTask, TimeSpan timeout)
106107
{
107-
Task actualTask = task.AsTask();
108-
if (actualTask == await Task.WhenAny(actualTask, Task.Delay(timeout)).ConfigureAwait(false))
109-
{
110-
await actualTask.ConfigureAwait(false);
111-
}
112-
else
108+
Task task = valueTask.AsTask();
109+
using (var cts = new CancellationTokenSource())
113110
{
114-
Task supressErrorTask = actualTask.ContinueWith(
115-
continuationAction: continuation,
116-
state: null,
117-
cancellationToken: CancellationToken.None,
118-
continuationOptions: s_tco,
119-
scheduler: TaskScheduler.Default);
120-
throw new TimeoutException();
111+
Task delayTask = Task.Delay(timeout, cts.Token);
112+
Task resultTask = await Task.WhenAny(task, delayTask).ConfigureAwait(false);
113+
if (resultTask == delayTask)
114+
{
115+
Task supressErrorTask = task.ContinueWith(
116+
continuationAction: continuation,
117+
state: null,
118+
cancellationToken: CancellationToken.None,
119+
continuationOptions: s_tco,
120+
scheduler: TaskScheduler.Default);
121+
throw new TimeoutException();
122+
}
123+
else
124+
{
125+
cts.Cancel();
126+
}
127+
128+
await valueTask.ConfigureAwait(false);
121129
}
122130
}
123131
#endif
124132
}
133+
134+
/*
135+
* https://devblogs.microsoft.com/dotnet/configureawait-faq/
136+
* I'm using GetAwaiter().GetResult(). Do I need to use ConfigureAwait(false)?
137+
* Answer: No
138+
*/
139+
public static void EnsureCompleted(this Task task)
140+
{
141+
task.GetAwaiter().GetResult();
142+
}
143+
144+
public static T EnsureCompleted<T>(this Task<T> task)
145+
{
146+
return task.GetAwaiter().GetResult();
147+
}
148+
149+
public static T EnsureCompleted<T>(this ValueTask<T> task)
150+
{
151+
return task.GetAwaiter().GetResult();
152+
}
153+
154+
public static void EnsureCompleted(this ValueTask task)
155+
{
156+
task.GetAwaiter().GetResult();
157+
}
125158
}
126159
}

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ await ConsumerDispatcher.WaitForShutdownAsync()
287287
}
288288
}
289289

290+
// TODO cancellation tokens
290291
internal async ValueTask ConnectionOpenAsync(string virtualHost)
291292
{
292293
var m = new ConnectionOpen(virtualHost);

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -213,12 +213,7 @@ internal void TakeOver(Connection other)
213213

214214
internal IConnection Open()
215215
{
216-
/*
217-
* https://devblogs.microsoft.com/dotnet/configureawait-faq/
218-
* I'm using GetAwaiter().GetResult(). Do I need to use ConfigureAwait(false)?
219-
* Answer: No
220-
*/
221-
return OpenAsync().GetAwaiter().GetResult();
216+
return OpenAsync().EnsureCompleted();
222217
}
223218

224219
internal async ValueTask<IConnection> OpenAsync()
@@ -530,12 +525,7 @@ internal void Write(RentedMemory frames)
530525
ValueTask task = _frameHandler.WriteAsync(frames);
531526
if (!task.IsCompletedSuccessfully)
532527
{
533-
/*
534-
* https://devblogs.microsoft.com/dotnet/configureawait-faq/
535-
* I'm using GetAwaiter().GetResult(). Do I need to use ConfigureAwait(false)?
536-
* Answer: No
537-
*/
538-
task.GetAwaiter().GetResult();
528+
task.EnsureCompleted();
539529
}
540530
}
541531

projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -183,12 +183,7 @@ public TimeSpan WriteTimeout
183183

184184
public void Close()
185185
{
186-
/*
187-
* https://devblogs.microsoft.com/dotnet/configureawait-faq/
188-
* I'm using GetAwaiter().GetResult(). Do I need to use ConfigureAwait(false)?
189-
* Answer: No
190-
*/
191-
CloseAsync().GetAwaiter().GetResult();
186+
CloseAsync().EnsureCompleted();
192187
}
193188

194189
public async ValueTask CloseAsync()
@@ -339,11 +334,9 @@ private void ConnectOrFail(ITcpClient socket, IPEndPoint endpoint, TimeSpan time
339334
{
340335
try
341336
{
337+
// this ensures exceptions aren't wrapped in an AggregateException
342338
socket.ConnectAsync(endpoint.Address, endpoint.Port)
343-
.TimeoutAfter(timeout)
344-
// this ensures exceptions aren't wrapped in an AggregateException
345-
.GetAwaiter()
346-
.GetResult();
339+
.TimeoutAfter(timeout).EnsureCompleted();
347340
}
348341
catch (ArgumentException e)
349342
{

projects/RabbitMQ.Client/client/impl/SslHelper.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ private SslHelper(SslOption sslOption)
5353
/// <summary>
5454
/// Upgrade a Tcp stream to an Ssl stream using the TLS options provided.
5555
/// </summary>
56+
// TODO async
5657
public static Stream TcpUpgrade(Stream tcpStream, SslOption options)
5758
{
5859
var helper = new SslHelper(options);
@@ -67,8 +68,9 @@ public static Stream TcpUpgrade(Stream tcpStream, SslOption options)
6768
Action<SslOption> TryAuthenticating = (SslOption opts) =>
6869
{
6970
sslStream.AuthenticateAsClientAsync(opts.ServerName, opts.Certs, opts.Version,
70-
opts.CheckCertificateRevocation).GetAwaiter().GetResult();
71+
opts.CheckCertificateRevocation).EnsureCompleted();
7172
};
73+
7274
try
7375
{
7476
// TODO async

projects/Test/SequentialIntegration/TestConnectionRecoveryBase.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -255,12 +255,7 @@ internal void RestartServerAndWaitForRecovery(AutorecoveringConnection conn)
255255
protected bool WaitForConfirms(IChannel m)
256256
{
257257
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(4));
258-
/*
259-
* https://devblogs.microsoft.com/dotnet/configureawait-faq/
260-
* I'm using GetAwaiter().GetResult(). Do I need to use ConfigureAwait(false)?
261-
* Answer: No
262-
*/
263-
return m.WaitForConfirmsAsync(cts.Token).GetAwaiter().GetResult();
258+
return m.WaitForConfirmsAsync(cts.Token).EnsureCompleted();
264259
}
265260

266261
protected void WaitForRecovery()

0 commit comments

Comments
 (0)