Skip to content

Commit ca5e5aa

Browse files
committed
Fix NestedPipeReader.AdvanceTo after ReadAsync encounters end of underlying PipeReader
This includes a behavioral breaking change that brings it in line with the behavior of `DefaultPipeReader`, which is that we fully expect `AdvanceTo` to be called even when the end of the `NestedPipeReader` is reached. Otherwise the underlying `PipeReader.ReadAsync` will throw when next called. Fixes #204
1 parent d822e69 commit ca5e5aa

File tree

2 files changed

+136
-72
lines changed

2 files changed

+136
-72
lines changed

src/Nerdbank.Streams.Tests/NestedPipeReaderTests.cs

Lines changed: 106 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public void TryRead_AllAtOnce_ExamineEverything()
4646
Assert.True(sliceReader.TryRead(out readResult));
4747
Assert.True(readResult.IsCompleted);
4848
Assert.True(readResult.Buffer.IsEmpty);
49+
sliceReader.AdvanceTo(readResult.Buffer.End);
4950

5051
// Verify that the original PipeReader can still produce bytes.
5152
Assert.True(this.pipe.Reader.TryRead(out readResult));
@@ -65,6 +66,7 @@ public void TryRead_AllAtOnce()
6566
Assert.True(sliceReader.TryRead(out readResult));
6667
Assert.True(readResult.IsCompleted);
6768
Assert.Equal(0, readResult.Buffer.Length);
69+
sliceReader.AdvanceTo(readResult.Buffer.End);
6870

6971
// Verify that the original PipeReader can still produce bytes.
7072
Assert.True(this.pipe.Reader.TryRead(out readResult));
@@ -113,6 +115,7 @@ public async Task ReadAsync_AllAtOnce()
113115
readResult = await sliceReader.ReadAsync(this.TimeoutToken);
114116
Assert.True(readResult.IsCompleted);
115117
Assert.True(readResult.Buffer.IsEmpty);
118+
sliceReader.AdvanceTo(readResult.Buffer.End);
116119

117120
// Verify that the original PipeReader can still produce bytes.
118121
Assert.True(this.pipe.Reader.TryRead(out readResult));
@@ -138,6 +141,7 @@ public async Task ReadAsync_ExamineEverything()
138141
readResult = await sliceReader.ReadAsync(this.TimeoutToken);
139142
Assert.True(readResult.IsCompleted);
140143
Assert.True(readResult.Buffer.IsEmpty);
144+
sliceReader.AdvanceTo(readResult.Buffer.End);
141145

142146
// Verify that the original PipeReader can still produce bytes.
143147
Assert.True(this.pipe.Reader.TryRead(out readResult));
@@ -164,6 +168,7 @@ public async Task ReadAsync_MultipleReads()
164168
readResult = await sliceReader.ReadAsync(this.TimeoutToken);
165169
Assert.True(readResult.IsCompleted);
166170
Assert.Equal(0, readResult.Buffer.Length);
171+
sliceReader.AdvanceTo(readResult.Buffer.End);
167172

168173
// Verify that the original PipeReader can still produce bytes.
169174
Assert.True(this.pipe.Reader.TryRead(out readResult));
@@ -186,6 +191,61 @@ public async Task ReadAsync_SliceExceedsUnderlyingLength()
186191
Assert.True(readResult.IsCompleted);
187192
}
188193

194+
[Fact]
195+
public async Task ReadAsync_TwiceOnCompletion_Throws()
196+
{
197+
this.pipe.Writer.Complete();
198+
199+
var sliceReader = this.pipe.Reader.ReadSlice(OriginalBuffer.Length);
200+
var readResult = await sliceReader.ReadAsync(this.TimeoutToken);
201+
sliceReader.AdvanceTo(readResult.Buffer.End);
202+
await sliceReader.ReadAsync(this.TimeoutToken);
203+
await Assert.ThrowsAsync<InvalidOperationException>(async () => await sliceReader.ReadAsync(this.TimeoutToken));
204+
}
205+
206+
[Fact]
207+
public async Task ReadAsync_ToEnd_AdvanceTo_Partial_ThenReadAsyncAgain()
208+
{
209+
this.pipe.Writer.Complete();
210+
211+
var sliceReader = this.pipe.Reader.ReadSlice(OriginalBuffer.Length);
212+
var readResult = await sliceReader.ReadAsync(this.TimeoutToken);
213+
Assert.True(readResult.IsCompleted);
214+
sliceReader.AdvanceTo(readResult.Buffer.GetPosition(3));
215+
216+
readResult = await sliceReader.ReadAsync(this.TimeoutToken);
217+
Assert.True(readResult.IsCompleted);
218+
sliceReader.AdvanceTo(readResult.Buffer.End);
219+
220+
readResult = await sliceReader.ReadAsync(this.TimeoutToken);
221+
sliceReader.AdvanceTo(readResult.Buffer.End);
222+
223+
readResult = await this.pipe.Reader.ReadAsync(this.TimeoutToken);
224+
Assert.True(readResult.IsCompleted);
225+
this.pipe.Reader.AdvanceTo(readResult.Buffer.End);
226+
}
227+
228+
[Fact]
229+
public async Task ReadAsync_AfterReadingExactBytes()
230+
{
231+
var slice = this.pipe.Reader.ReadSlice(OriginalBuffer.Length);
232+
var readResult = await slice.ReadAsync(this.TimeoutToken);
233+
Assert.True(readResult.IsCompleted);
234+
slice.AdvanceTo(readResult.Buffer.End);
235+
236+
// Try to read again...
237+
readResult = await slice.ReadAsync(this.TimeoutToken);
238+
Assert.True(readResult.Buffer.IsEmpty);
239+
Assert.True(readResult.IsCompleted);
240+
slice.AdvanceTo(readResult.Buffer.End);
241+
242+
// And again...
243+
readResult = await slice.ReadAsync(this.TimeoutToken);
244+
Assert.True(readResult.Buffer.IsEmpty);
245+
Assert.True(readResult.IsCompleted);
246+
slice.AdvanceTo(readResult.Buffer.End);
247+
}
248+
189249
[Fact]
190250
public void OnWriterCompleted_NoOps()
191251
{
@@ -199,30 +259,51 @@ public void OnWriterCompleted_NoOps()
199259
}
200260

201261
[Fact]
202-
public void TryRead_ThrowsAfterCompleting()
262+
public void TryRead_ThrowsAfterCompleting_Prematurely()
203263
{
204264
var sliceReader = this.pipe.Reader.ReadSlice(5);
205265
sliceReader.Complete();
206266
Assert.Throws<InvalidOperationException>(() => sliceReader.TryRead(out ReadResult result));
207267
}
208268

209269
[Fact]
210-
public async Task ReadAsync_ThrowsAfterCompleting()
270+
public async Task ReadAsync_ThrowsAfterCompleting_Prematurely()
211271
{
212272
var sliceReader = this.pipe.Reader.ReadSlice(5);
213273
sliceReader.Complete();
214274
await Assert.ThrowsAsync<InvalidOperationException>(() => sliceReader.ReadAsync(this.TimeoutToken).AsTask());
215275
}
216276

217277
[Fact]
218-
public void Complete_DoesNotCompleteUnderlyingReader()
278+
public void TryRead_ThrowsAfterCompleting_AfterFullRead()
219279
{
220280
var sliceReader = this.pipe.Reader.ReadSlice(5);
281+
Assert.True(sliceReader.TryRead(out ReadResult readResult));
282+
Assert.True(readResult.IsCompleted);
283+
sliceReader.AdvanceTo(readResult.Buffer.End);
221284
sliceReader.Complete();
222-
Assert.True(this.pipe.Reader.TryRead(out ReadResult result));
223-
Assert.Equal(OriginalBuffer.Length, result.Buffer.Length);
285+
Assert.Throws<InvalidOperationException>(() => sliceReader.TryRead(out ReadResult result));
286+
287+
Assert.True(this.pipe.Reader.TryRead(out readResult));
288+
Assert.False(readResult.Buffer.IsEmpty);
224289
this.pipe.Reader.Complete();
225-
Assert.Throws<InvalidOperationException>(() => this.pipe.Reader.TryRead(out result));
290+
Assert.Throws<InvalidOperationException>(() => this.pipe.Reader.TryRead(out ReadResult result));
291+
}
292+
293+
[Fact]
294+
public async Task ReadAsync_ThrowsAfterCompleting_AfterFullRead()
295+
{
296+
var sliceReader = this.pipe.Reader.ReadSlice(5);
297+
var readResult = await sliceReader.ReadAsync(this.TimeoutToken);
298+
Assert.True(readResult.IsCompleted);
299+
sliceReader.AdvanceTo(readResult.Buffer.End);
300+
sliceReader.Complete();
301+
await Assert.ThrowsAsync<InvalidOperationException>(() => sliceReader.ReadAsync(this.TimeoutToken).AsTask());
302+
303+
readResult = await this.pipe.Reader.ReadAsync(this.TimeoutToken);
304+
Assert.False(readResult.Buffer.IsEmpty);
305+
this.pipe.Reader.Complete();
306+
await Assert.ThrowsAsync<InvalidOperationException>(() => this.pipe.Reader.ReadAsync(this.TimeoutToken).AsTask());
226307
}
227308

228309
[Fact]
@@ -233,6 +314,25 @@ public void Complete_WithException_DoesCompleteUnderlyingReader()
233314
Assert.Throws<InvalidOperationException>(() => this.pipe.Reader.TryRead(out ReadResult result));
234315
}
235316

317+
[Fact]
318+
public void Complete_Twice_WithoutReading()
319+
{
320+
var sliceReader = this.pipe.Reader.ReadSlice(5);
321+
sliceReader.Complete();
322+
sliceReader.Complete();
323+
}
324+
325+
[Fact]
326+
public void Complete_Twice_AfterReading()
327+
{
328+
var sliceReader = this.pipe.Reader.ReadSlice(5);
329+
Assert.True(sliceReader.TryRead(out ReadResult result));
330+
Assert.True(result.IsCompleted);
331+
sliceReader.AdvanceTo(result.Buffer.End);
332+
sliceReader.Complete();
333+
sliceReader.Complete();
334+
}
335+
236336
[Fact]
237337
public void CancelPendingRead_UnderlyingReader_TryRead()
238338
{

src/Nerdbank.Streams/NestedPipeReader.cs

Lines changed: 30 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@ internal class NestedPipeReader : PipeReader
1818
private readonly PipeReader pipeReader;
1919
private long length;
2020
private long consumedLength;
21-
private long examinedLength;
2221
private ReadResult resultOfPriorRead;
23-
private bool nextReadCanceled;
24-
private bool readerCompleted;
22+
private bool completed;
2523

2624
public NestedPipeReader(PipeReader pipeReader, long length)
2725
{
@@ -35,52 +33,33 @@ public NestedPipeReader(PipeReader pipeReader, long length)
3533
private long RemainingLength => this.length - this.consumedLength;
3634

3735
/// <inheritdoc/>
38-
public override void AdvanceTo(SequencePosition consumed)
39-
{
40-
if (this.Consumed(consumed, consumed))
41-
{
42-
this.pipeReader.AdvanceTo(consumed);
43-
44-
// When we call AdvanceTo on the underlying reader, we're not allowed to reference their buffer any more, so clear it to be safe.
45-
this.resultOfPriorRead = new ReadResult(default, isCanceled: false, isCompleted: this.resultOfPriorRead.IsCompleted);
46-
}
47-
}
36+
public override void AdvanceTo(SequencePosition consumed) => this.AdvanceTo(consumed, consumed);
4837

4938
/// <inheritdoc/>
5039
public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
5140
{
52-
if (this.Consumed(consumed, examined))
53-
{
54-
this.pipeReader.AdvanceTo(consumed, examined);
41+
this.consumedLength += this.resultOfPriorRead.Buffer.Slice(0, consumed).Length;
42+
this.pipeReader.AdvanceTo(consumed, examined);
5543

56-
// When we call AdvanceTo on the underlying reader, we're not allowed to reference their buffer any more, so clear it to be safe.
57-
this.resultOfPriorRead = new ReadResult(default, isCanceled: false, isCompleted: this.resultOfPriorRead.IsCompleted);
58-
}
44+
// When we call AdvanceTo on the underlying reader, we're not allowed to reference their buffer any more, so clear it to be safe.
45+
this.resultOfPriorRead = new ReadResult(default, isCanceled: false, isCompleted: this.resultOfPriorRead.IsCompleted);
5946
}
6047

6148
/// <inheritdoc/>
62-
public override void CancelPendingRead()
63-
{
64-
if (this.resultOfPriorRead.IsCompleted)
65-
{
66-
this.nextReadCanceled = true;
67-
}
68-
else
69-
{
70-
this.pipeReader.CancelPendingRead();
71-
}
72-
}
49+
public override void CancelPendingRead() => this.pipeReader.CancelPendingRead();
7350

7451
/// <inheritdoc/>
52+
/// <remarks>
53+
/// If the slice has not been fully read or if <paramref name="exception"/> is non-null, this call propagates to the underlying <see cref="PipeReader"/>.
54+
/// But if the slice is fully read without errors, the call is suppressed so the underlying <see cref="PipeReader"/> can continue to function.
55+
/// </remarks>
7556
public override void Complete(Exception? exception = null)
7657
{
77-
// We don't want a nested PipeReader to complete the underlying one unless there was an exception.
78-
if (exception != null)
58+
this.completed = true;
59+
if (exception is object || this.RemainingLength > 0)
7960
{
8061
this.pipeReader.Complete(exception);
8162
}
82-
83-
this.readerCompleted = true;
8463
}
8564

8665
/// <inheritdoc/>
@@ -92,18 +71,25 @@ public override void OnWriterCompleted(Action<Exception, object> callback, objec
9271
/// <inheritdoc/>
9372
public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
9473
{
95-
Verify.Operation(!this.readerCompleted, Strings.ReadingAfterCompletionNotAllowed);
96-
if (this.resultOfPriorRead.IsCompleted)
74+
Verify.Operation(!this.completed, Strings.ReadingAfterCompletionNotAllowed);
75+
76+
ReadResult result;
77+
if (this.RemainingLength == 0)
78+
{
79+
// We do NOT want to block on reading more bytes since we don't expect or want any.
80+
// But we DO want to put the underlying reader back into a reading mode so we don't
81+
// have to emulate that in our class.
82+
this.pipeReader.TryRead(out result);
83+
}
84+
else
9785
{
98-
var cachedResult = this.resultOfPriorRead = new ReadResult(this.resultOfPriorRead.Buffer, isCanceled: this.nextReadCanceled, isCompleted: true);
99-
this.nextReadCanceled = false;
100-
return cachedResult;
86+
result = await this.pipeReader.ReadAsync(cancellationToken).ConfigureAwait(false);
10187
}
10288

103-
var result = await this.pipeReader.ReadAsync(cancellationToken).ConfigureAwait(false);
89+
// Do not allow the reader to exceed the length of this slice.
10490
if (result.Buffer.Length >= this.RemainingLength)
10591
{
106-
result = new ReadResult(result.Buffer.Slice(0, this.RemainingLength), isCanceled: false, isCompleted: true);
92+
result = new ReadResult(result.Buffer.Slice(0, this.RemainingLength), isCanceled: result.IsCanceled, isCompleted: true);
10793
}
10894

10995
return this.resultOfPriorRead = result;
@@ -112,19 +98,13 @@ public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancella
11298
/// <inheritdoc/>
11399
public override bool TryRead(out ReadResult result)
114100
{
115-
Verify.Operation(!this.readerCompleted, Strings.ReadingAfterCompletionNotAllowed);
116-
if (this.resultOfPriorRead.IsCompleted)
117-
{
118-
result = this.resultOfPriorRead = new ReadResult(this.resultOfPriorRead.Buffer, isCanceled: this.nextReadCanceled, isCompleted: true);
119-
this.nextReadCanceled = false;
120-
return true;
121-
}
122-
101+
Verify.Operation(!this.completed, Strings.ReadingAfterCompletionNotAllowed);
123102
if (this.pipeReader.TryRead(out result))
124103
{
104+
// Do not allow the reader to exceed the length of this slice.
125105
if (result.Buffer.Length > this.RemainingLength)
126106
{
127-
result = new ReadResult(result.Buffer.Slice(0, this.RemainingLength), isCanceled: false, isCompleted: true);
107+
result = new ReadResult(result.Buffer.Slice(0, this.RemainingLength), isCanceled: result.IsCanceled, isCompleted: true);
128108
}
129109

130110
this.resultOfPriorRead = result;
@@ -133,21 +113,5 @@ public override bool TryRead(out ReadResult result)
133113

134114
return false;
135115
}
136-
137-
private bool Consumed(SequencePosition consumed, SequencePosition examined)
138-
{
139-
this.consumedLength += this.resultOfPriorRead.Buffer.Slice(0, consumed).Length;
140-
this.examinedLength += this.resultOfPriorRead.Buffer.Slice(0, examined).Length;
141-
142-
if (this.resultOfPriorRead.IsCompleted)
143-
{
144-
// We have to keep our own ReadResult current since we can't call the inner read methods any more.
145-
this.resultOfPriorRead = new ReadResult(this.resultOfPriorRead.Buffer.Slice(consumed), isCanceled: false, isCompleted: true);
146-
}
147-
148-
// The caller is allowed to propagate the AdvanceTo call to the underlying reader iff
149-
// we will be reading again from it, or our own final buffer is empty.
150-
return !this.resultOfPriorRead.IsCompleted || this.resultOfPriorRead.Buffer.IsEmpty;
151-
}
152116
}
153117
}

0 commit comments

Comments
 (0)