Skip to content

Commit 64bf2e7

Browse files
committed
Fix missing cancel() in tests that don't consume the entire source
1 parent bfdf028 commit 64bf2e7

File tree

2 files changed

+263
-36
lines changed

2 files changed

+263
-36
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
using NUnit.Framework;
2+
using System;
3+
using System.Collections.Concurrent;
4+
using System.Collections.Generic;
5+
using System.Diagnostics;
6+
using System.Linq;
7+
using System.Text;
8+
using System.Threading;
9+
using System.Threading.Tasks;
10+
11+
namespace Reactive.Streams.TCK.Tests
12+
{
13+
[TestFixture]
14+
public class RangePublisherTest : PublisherVerification<int>
15+
{
16+
static readonly ConcurrentDictionary<int, string> stacks = new ConcurrentDictionary<int, string>();
17+
18+
static readonly ConcurrentDictionary<int, bool> states = new ConcurrentDictionary<int, bool>();
19+
20+
static int id;
21+
22+
[TearDown]
23+
public void AfterTest()
24+
{
25+
bool fail = false;
26+
StringBuilder b = new StringBuilder();
27+
foreach (var t in states)
28+
{
29+
if (!t.Value)
30+
{
31+
b.Append("\r\n-------------------------------");
32+
33+
b.Append("\r\nat ").Append(stacks[t.Key]);
34+
35+
fail = true;
36+
}
37+
}
38+
states.Clear();
39+
stacks.Clear();
40+
if (fail)
41+
{
42+
throw new InvalidOperationException("Cancellations were missing:" + b);
43+
}
44+
}
45+
46+
public RangePublisherTest() : base(new TestEnvironment(25))
47+
{
48+
}
49+
50+
public override IPublisher<int> CreatePublisher(long elements)
51+
{
52+
return new RangePublisher(1, elements);
53+
}
54+
55+
public override IPublisher<int> CreateFailedPublisher()
56+
{
57+
return null;
58+
}
59+
60+
internal sealed class RangePublisher : IPublisher<int>
61+
{
62+
63+
readonly string stacktrace;
64+
65+
readonly long start;
66+
67+
readonly long count;
68+
69+
internal RangePublisher(long start, long count)
70+
{
71+
this.stacktrace = Environment.StackTrace;
72+
this.start = start;
73+
this.count = count;
74+
}
75+
76+
public void Subscribe(ISubscriber<int> s)
77+
{
78+
if (s == null)
79+
{
80+
throw new ArgumentNullException();
81+
}
82+
83+
int ids = Interlocked.Increment(ref id);
84+
85+
RangeSubscription parent = new RangeSubscription(s, ids, start, start + count);
86+
stacks.AddOrUpdate(ids, (a) => stacktrace, (a, b) => stacktrace);
87+
states.AddOrUpdate(ids, (a) => false, (a, b) => false);
88+
s.OnSubscribe(parent);
89+
}
90+
91+
sealed class RangeSubscription : ISubscription
92+
{
93+
94+
readonly ISubscriber<int> actual;
95+
96+
readonly int ids;
97+
98+
readonly long end;
99+
100+
long index;
101+
102+
volatile bool cancelled;
103+
104+
long requested;
105+
106+
internal RangeSubscription(ISubscriber<int> actual, int ids, long start, long end)
107+
{
108+
this.actual = actual;
109+
this.ids = ids;
110+
this.index = start;
111+
this.end = end;
112+
}
113+
114+
115+
public void Request(long n)
116+
{
117+
if (!cancelled)
118+
{
119+
if (n <= 0L)
120+
{
121+
cancelled = true;
122+
states[ids] = true;
123+
actual.OnError(new ArgumentException("§3.9 violated"));
124+
return;
125+
}
126+
127+
for (;;)
128+
{
129+
long r = Volatile.Read(ref requested);
130+
long u = r + n;
131+
if (u < 0L)
132+
{
133+
u = long.MaxValue;
134+
}
135+
if (Interlocked.CompareExchange(ref requested, u, r) == r)
136+
{
137+
if (r == 0)
138+
{
139+
break;
140+
}
141+
return;
142+
}
143+
}
144+
145+
long idx = index;
146+
long f = end;
147+
148+
for (;;)
149+
{
150+
long e = 0;
151+
while (e != n && idx != f)
152+
{
153+
if (cancelled)
154+
{
155+
return;
156+
}
157+
158+
actual.OnNext((int)idx);
159+
160+
idx++;
161+
e++;
162+
}
163+
164+
if (idx == f)
165+
{
166+
if (!cancelled)
167+
{
168+
states[ids] = true;
169+
actual.OnComplete();
170+
}
171+
return;
172+
}
173+
174+
index = idx;
175+
n = Interlocked.Add(ref requested, -n);
176+
if (n == 0)
177+
{
178+
break;
179+
}
180+
}
181+
}
182+
}
183+
184+
public void Cancel()
185+
{
186+
cancelled = true;
187+
states[ids] = true;
188+
}
189+
}
190+
}
191+
}
192+
}

0 commit comments

Comments
 (0)