Skip to content

Commit 486a587

Browse files
committed
Fix StreamPipeReader to better emulate other PipeReader implementations
Add many more StreamPipeReader tests and fix up some behaviors
1 parent ca5e5aa commit 486a587

File tree

8 files changed

+196
-46
lines changed

8 files changed

+196
-46
lines changed

ext/MessagePack

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Subproject commit a3e2a28e785fab6a413164e5bec278a7b83758d6
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright (c) Andrew Arnott. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE.txt file in the project root for full license information.
3+
4+
using System.IO;
5+
using System.IO.Pipelines;
6+
using Xunit.Abstractions;
7+
8+
public class IOPipelinesStreamPipeReaderTests : StreamPipeReaderTestBase
9+
{
10+
public IOPipelinesStreamPipeReaderTests(ITestOutputHelper logger)
11+
: base(logger)
12+
{
13+
}
14+
15+
protected override PipeReader CreatePipeReader(Stream stream, int hintSize = 0) => PipeReader.Create(stream, new StreamPipeReaderOptions(bufferSize: hintSize == 0 ? -1 : hintSize));
16+
}

src/Nerdbank.Streams.Tests/StreamPipeReaderTestBase.cs

Lines changed: 145 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT license. See LICENSE.txt file in the project root for full license information.
33

44
using System;
5+
using System.Buffers;
56
using System.Collections.Generic;
67
using System.IO;
78
using System.IO.Pipelines;
@@ -22,24 +23,19 @@ public StreamPipeReaderTestBase(ITestOutputHelper logger)
2223
{
2324
}
2425

26+
protected virtual bool EmulatePipelinesStreamPipeReader => true;
27+
2528
[Fact]
2629
public void ThrowsOnNull()
2730
{
2831
Assert.Throws<ArgumentNullException>(() => this.CreatePipeReader((Stream)null!));
2932
}
3033

31-
[Fact]
32-
public void NonReadableStream()
33-
{
34-
var unreadableStream = new Mock<Stream>(MockBehavior.Strict);
35-
unreadableStream.SetupGet(s => s.CanRead).Returns(false);
36-
Assert.Throws<ArgumentException>(() => this.CreatePipeReader(unreadableStream.Object));
37-
unreadableStream.VerifyAll();
38-
}
39-
40-
[Fact]
34+
[SkippableFact]
4135
public async Task Stream()
4236
{
37+
Skip.If(this is IOPipelinesStreamPipeReaderTests, "OnWriterCompleted isn't supported.");
38+
4339
byte[] expectedBuffer = this.GetRandomBuffer(2048);
4440
var stream = new MemoryStream(expectedBuffer);
4541
var reader = this.CreatePipeReader(stream, sizeHint: 50);
@@ -141,8 +137,110 @@ public async Task TryRead()
141137
}
142138

143139
[Fact]
140+
public void TryRead_FalseStillTurnsOnReadingMode()
141+
{
142+
var reader = this.CreatePipeReader(new MemoryStream(new byte[] { 1, 2, 3 }));
143+
144+
bool tryReadResult = reader.TryRead(out var readResult);
145+
146+
// The non-strict PipeReader is timing sensitive and may or may not be able to synchronously produce a read.
147+
// So only assert the False result for the strict readers.
148+
if (this.EmulatePipelinesStreamPipeReader)
149+
{
150+
Assert.False(tryReadResult);
151+
}
152+
153+
reader.AdvanceTo(readResult.Buffer.End);
154+
}
155+
156+
[Fact]
157+
public void TryRead_FalseCanBeCalledRepeatedly()
158+
{
159+
var reader = this.CreatePipeReader(new MemoryStream(new byte[] { 1, 2, 3 }));
160+
161+
// Verify that it's safe to call TryRead repeatedly when it returns False.
162+
Assert.False(reader.TryRead(out var readResult));
163+
Assert.False(reader.TryRead(out readResult));
164+
}
165+
166+
[Fact]
167+
public async Task TryRead_AdvanceTo_AfterEndReached()
168+
{
169+
var reader = this.CreatePipeReader(new MemoryStream(new byte[] { 1, 2, 3 }));
170+
171+
var readResult = await reader.ReadAsync(this.TimeoutToken);
172+
reader.AdvanceTo(readResult.Buffer.End);
173+
174+
reader.TryRead(out readResult);
175+
reader.AdvanceTo(readResult.Buffer.End);
176+
}
177+
178+
[Theory, PairwiseData]
179+
public async Task ReadAsync_TwiceInARow(bool emptyStream)
180+
{
181+
var stream = new MemoryStream(emptyStream ? Array.Empty<byte>() : new byte[3]);
182+
var reader = this.CreatePipeReader(stream, sizeHint: 50);
183+
var result1 = await reader.ReadAsync(this.TimeoutToken);
184+
if (emptyStream)
185+
{
186+
Assert.True(result1.IsCompleted);
187+
}
188+
189+
if (this.EmulatePipelinesStreamPipeReader)
190+
{
191+
var result2 = await reader.ReadAsync(this.TimeoutToken);
192+
Assert.Equal(result1.Buffer.Length, result2.Buffer.Length);
193+
Assert.Equal(emptyStream, result2.IsCompleted);
194+
}
195+
else
196+
{
197+
await Assert.ThrowsAsync<InvalidOperationException>(async () => await reader.ReadAsync(this.TimeoutToken));
198+
}
199+
}
200+
201+
[Theory, PairwiseData]
202+
public async Task TryRead_TwiceInARow(bool emptyStream)
203+
{
204+
var stream = new MemoryStream(emptyStream ? Array.Empty<byte>() : new byte[3]);
205+
var reader = this.CreatePipeReader(stream, sizeHint: 50);
206+
207+
// Read asynchronously once to guarantee that there's a non-empty buffer in the reader.
208+
var readResult = await reader.ReadAsync(this.TimeoutToken);
209+
reader.AdvanceTo(readResult.Buffer.Start);
210+
211+
Assert.Equal(!emptyStream || !this.EmulatePipelinesStreamPipeReader, reader.TryRead(out _));
212+
if (this.EmulatePipelinesStreamPipeReader)
213+
{
214+
Assert.Equal(!emptyStream, reader.TryRead(out _));
215+
}
216+
else
217+
{
218+
Assert.Throws<InvalidOperationException>(() => reader.TryRead(out _));
219+
}
220+
}
221+
222+
[Fact]
223+
public void AdvanceTo_BeforeRead()
224+
{
225+
var stream = new MemoryStream();
226+
var reader = this.CreatePipeReader(stream, sizeHint: 50);
227+
if (this.EmulatePipelinesStreamPipeReader)
228+
{
229+
reader.AdvanceTo(default);
230+
}
231+
else
232+
{
233+
Assert.Throws<InvalidOperationException>(() => reader.AdvanceTo(default));
234+
}
235+
236+
var ex = Assert.ThrowsAny<Exception>(() => reader.AdvanceTo(ReadOnlySequence<byte>.Empty.Start));
237+
Assert.True(ex is InvalidCastException || ex is InvalidOperationException);
238+
}
239+
240+
[SkippableFact]
144241
public async Task OnWriterCompleted()
145242
{
243+
Skip.If(this is IOPipelinesStreamPipeReaderTests, "OnWriterCompleted isn't supported.");
146244
byte[] expectedBuffer = this.GetRandomBuffer(50);
147245
var stream = new MemoryStream(expectedBuffer);
148246
var reader = this.CreatePipeReader(stream, sizeHint: 50);
@@ -184,19 +282,50 @@ public async Task CancelPendingRead_WithCancellationToken()
184282
}
185283

186284
[Fact]
187-
public async Task Complete_DoesNotCauseStreamDisposal()
285+
public async Task Complete_MayCauseStreamDisposal()
188286
{
189287
var stream = new HalfDuplexStream();
190-
var reader = this.CreatePipeReader(stream);
288+
var ms = new MonitoringStream(stream);
289+
var disposal = new AsyncManualResetEvent();
290+
ms.Disposed += (s, e) => disposal.Set();
291+
var reader = this.CreatePipeReader(ms);
191292
reader.Complete();
192293

193-
var timeout = ExpectedTimeoutToken;
194-
while (!stream.IsDisposed && !timeout.IsCancellationRequested)
294+
if (this is IOPipelinesStreamPipeReaderTests)
295+
{
296+
await disposal.WaitAsync(this.TimeoutToken);
297+
}
298+
else
195299
{
196-
await Task.Yield();
300+
await Assert.ThrowsAsync<OperationCanceledException>(() => disposal.WaitAsync(ExpectedTimeoutToken));
197301
}
302+
}
303+
304+
[Fact]
305+
public async Task CancelPendingRead()
306+
{
307+
var stream = new HalfDuplexStream();
308+
var reader = this.CreatePipeReader(stream, sizeHint: 50);
198309

199-
Assert.False(stream.IsDisposed);
310+
ValueTask<ReadResult> readTask = reader.ReadAsync(this.TimeoutToken);
311+
reader.CancelPendingRead();
312+
var readResult = await readTask.AsTask().WithCancellation(this.TimeoutToken);
313+
Assert.True(readResult.IsCanceled);
314+
315+
// Verify we can read after that without cancellation.
316+
readTask = reader.ReadAsync(this.TimeoutToken);
317+
stream.Write(new byte[] { 1, 2, 3 }, 0, 3);
318+
await stream.FlushAsync(this.TimeoutToken);
319+
readResult = await readTask;
320+
Assert.False(readResult.IsCanceled);
321+
Assert.Equal(3, readResult.Buffer.Length);
322+
reader.AdvanceTo(readResult.Buffer.End);
323+
324+
// Now cancel again
325+
readTask = reader.ReadAsync(this.TimeoutToken);
326+
reader.CancelPendingRead();
327+
readResult = await readTask;
328+
Assert.True(readResult.IsCanceled);
200329
}
201330

202331
protected abstract PipeReader CreatePipeReader(Stream stream, int sizeHint = 0);

src/Nerdbank.Streams.Tests/StreamUsePipeReaderTests.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ public StreamUsePipeReaderTests(ITestOutputHelper logger)
2222
{
2323
}
2424

25+
protected override bool EmulatePipelinesStreamPipeReader => false;
26+
2527
[Fact]
2628
public async Task StreamFails()
2729
{
@@ -51,5 +53,14 @@ public async Task Complete_CausesWriterCompletion()
5153
await writerCompletion.WithCancellation(this.TimeoutToken);
5254
}
5355

56+
[Fact]
57+
public void NonReadableStream()
58+
{
59+
var unreadableStream = new Mock<Stream>(MockBehavior.Strict);
60+
unreadableStream.SetupGet(s => s.CanRead).Returns(false);
61+
Assert.Throws<ArgumentException>(() => this.CreatePipeReader(unreadableStream.Object));
62+
unreadableStream.VerifyAll();
63+
}
64+
5465
protected override PipeReader CreatePipeReader(Stream stream, int hintSize = 0) => stream.UsePipeReader(hintSize);
5566
}

src/Nerdbank.Streams.Tests/StreamUseStrictPipeReaderTests.cs

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -41,33 +41,5 @@ public async Task StreamFails()
4141
Assert.Same(expectedException, actualException);
4242
}
4343

44-
[Fact] // Bizarre behavior when using the built-in Pipe class: https://github.com/dotnet/corefx/issues/31696
45-
public async Task CancelPendingRead()
46-
{
47-
var stream = new HalfDuplexStream();
48-
var reader = this.CreatePipeReader(stream, sizeHint: 50);
49-
50-
ValueTask<ReadResult> readTask = reader.ReadAsync(this.TimeoutToken);
51-
reader.CancelPendingRead();
52-
var readResult = await readTask.AsTask().WithCancellation(this.TimeoutToken);
53-
Assert.True(readResult.IsCanceled);
54-
////reader.AdvanceTo(readResult.Buffer.End);
55-
56-
// Verify we can read after that without cancellation.
57-
readTask = reader.ReadAsync(this.TimeoutToken);
58-
stream.Write(new byte[] { 1, 2, 3 }, 0, 3);
59-
await stream.FlushAsync(this.TimeoutToken);
60-
readResult = await readTask;
61-
Assert.False(readResult.IsCanceled);
62-
Assert.Equal(3, readResult.Buffer.Length);
63-
reader.AdvanceTo(readResult.Buffer.End);
64-
65-
// Now cancel again
66-
readTask = reader.ReadAsync(this.TimeoutToken);
67-
reader.CancelPendingRead();
68-
readResult = await readTask;
69-
Assert.True(readResult.IsCanceled);
70-
}
71-
7244
protected override PipeReader CreatePipeReader(Stream stream, int sizeHint = 0) => stream.UseStrictPipeReader(sizeHint);
7345
}

src/Nerdbank.Streams/PipeExtensions.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ public static PipeReader UsePipeReader(this Stream stream, int sizeHint = 0, Pip
7878

7979
/// <summary>
8080
/// Creates a <see cref="PipeReader"/> that reads from the specified <see cref="Stream"/> exactly as told to do so.
81+
/// It does *not* close the <paramref name="stream"/> when completed.
8182
/// </summary>
8283
/// <param name="stream">The stream to read from using a pipe.</param>
8384
/// <param name="sizeHint">A hint at the size of messages that are commonly transferred. Use 0 for a commonly reasonable default.</param>
@@ -91,7 +92,7 @@ public static PipeReader UseStrictPipeReader(this Stream stream, int sizeHint =
9192
Requires.NotNull(stream, nameof(stream));
9293
Requires.Argument(stream.CanRead, nameof(stream), "Stream must be readable.");
9394

94-
return new StreamPipeReader(stream, sizeHint);
95+
return new StreamPipeReader(stream, sizeHint, leaveOpen: true);
9596
}
9697

9798
/// <summary>

src/Nerdbank.Streams/StreamPipeReader.cs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
namespace Nerdbank.Streams
55
{
66
using System;
7+
using System.Buffers;
78
using System.Collections.Generic;
89
using System.IO;
910
using System.IO.Pipelines;
@@ -19,10 +20,17 @@ internal class StreamPipeReader : PipeReader
1920
{
2021
private readonly object syncObject = new object();
2122

23+
/// <summary><inheritdoc cref="StreamPipeReader(Stream, int, bool)" path="/param[@name='stream']"/></summary>
2224
private readonly Stream stream;
2325

26+
/// <summary>
27+
/// May be 0 for a reasonable default as determined by the <see cref="IBufferWriter{T}.GetMemory"/> method.
28+
/// </summary>
2429
private readonly int bufferSize;
2530

31+
/// <summary><inheritdoc cref="StreamPipeReader(Stream, int, bool)" path="/param[@name='leaveOpen']"/></summary>
32+
private readonly bool leaveOpen;
33+
2634
private readonly Sequence<byte> buffer = new Sequence<byte>();
2735

2836
private SequencePosition examined;
@@ -39,12 +47,19 @@ internal class StreamPipeReader : PipeReader
3947

4048
private List<(Action<Exception?, object?>, object?)>? writerCompletedCallbacks;
4149

42-
internal StreamPipeReader(Stream stream, int bufferSize)
50+
/// <summary>
51+
/// Initializes a new instance of the <see cref="StreamPipeReader"/> class.
52+
/// </summary>
53+
/// <param name="stream">The stream to read from.</param>
54+
/// <param name="bufferSize">A hint at the size of messages that are commonly transferred. Use 0 for a commonly reasonable default.</param>
55+
/// <param name="leaveOpen"><c>true</c> to leave the underlying <paramref name="stream"/> open after calling <see cref="PipeReader.Complete(Exception)"/>; <c>false</c> to close the stream.</param>
56+
internal StreamPipeReader(Stream stream, int bufferSize, bool leaveOpen)
4357
{
4458
Requires.NotNull(stream, nameof(stream));
4559
Requires.Argument(stream.CanRead, nameof(stream), "Stream must be readable.");
4660
this.stream = stream;
4761
this.bufferSize = bufferSize;
62+
this.leaveOpen = leaveOpen;
4863
}
4964

5065
/// <inheritdoc />
@@ -71,6 +86,10 @@ public override void Complete(Exception? exception = null)
7186
this.isReaderCompleted = true;
7287
this.readerException = exception;
7388
this.buffer.Reset();
89+
if (!this.leaveOpen)
90+
{
91+
this.stream.Dispose();
92+
}
7493
}
7594
}
7695

src/shipping.ruleset

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
</Rules>
7171
<Rules AnalyzerId="StyleCop.Analyzers" RuleNamespace="StyleCop.Analyzers">
7272
<Rule Id="SA1130" Action="None" />
73+
<Rule Id="SA1133" Action="Hidden" />
7374
<Rule Id="SA1600" Action="Hidden" />
7475
</Rules>
7576
<Rules AnalyzerId="Microsoft.VisualStudio.Threading.Analyzers" RuleNamespace="Microsoft.VisualStudio.Threading.Analyzers">

0 commit comments

Comments
 (0)