forked from reactive-streams/reactive-streams-dotnet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSyncSubscriberWhiteboxTest.cs
69 lines (54 loc) · 2.02 KB
/
SyncSubscriberWhiteboxTest.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
using System;
using NUnit.Framework;
using Reactive.Streams.TCK;
namespace Reactive.Streams.Example.Unicast.Tests
{
[TestFixture]
public class SyncSubscriberWhiteboxTest : SubscriberWhiteboxVerification<int>
{
public SyncSubscriberWhiteboxTest() : base(new TestEnvironment())
{
}
public override int CreateElement(int element) => element;
public override ISubscriber<int> CreateSubscriber(WhiteboxSubscriberProbe<int> probe) => new Subscriber(probe);
private sealed class Subscriber : SyncSubscriber<int>
{
private readonly WhiteboxSubscriberProbe<int> _probe;
public Subscriber(WhiteboxSubscriberProbe<int> probe)
{
_probe = probe;
}
public override void OnSubscribe(ISubscription subscription)
{
base.OnSubscribe(subscription);
_probe.RegisterOnSubscribe(new SubscriberPuppet(subscription));
}
private sealed class SubscriberPuppet : ISubscriberPuppet
{
private readonly ISubscription _subscription;
public SubscriberPuppet(ISubscription subscription)
{
_subscription = subscription;
}
public void TriggerRequest(long elements) => _subscription.Request(elements);
public void SignalCancel() => _subscription.Cancel();
}
public override void OnNext(int element)
{
base.OnNext(element);
_probe.RegisterOnNext(element);
}
protected override bool WhenNext(int element) => true;
public override void OnError(Exception cause)
{
base.OnError(cause);
_probe.RegisterOnError(cause);
}
public override void OnComplete()
{
base.OnComplete();
_probe.RegisterOnComplete();
}
}
}
}