@@ -572,8 +572,6 @@ public void expectNone(long withinMillis, String errMsgPrefix) throws Interrupte
572
572
573
573
public static class ManualSubscriberWithSubscriptionSupport <T > extends ManualSubscriber <T > {
574
574
575
- private final AtomicBoolean onSubscribeCalled = new AtomicBoolean ();
576
-
577
575
public ManualSubscriberWithSubscriptionSupport (TestEnvironment env ) {
578
576
super (env );
579
577
}
@@ -591,7 +589,7 @@ public void onNext(T element) {
591
589
@ Override
592
590
public void onComplete () {
593
591
env .debug (this + "::onComplete()" );
594
- if (onSubscribeCalled . get ()) {
592
+ if (subscription . isCompleted ()) {
595
593
super .onComplete ();
596
594
} else {
597
595
env .flop ("Subscriber::onComplete() called before Subscriber::onSubscribe" );
@@ -603,7 +601,6 @@ public void onSubscribe(Subscription s) {
603
601
env .debug (String .format ("%s::onSubscribe(%s)" , this , s ));
604
602
if (!subscription .isCompleted ()) {
605
603
subscription .complete (s );
606
- onSubscribeCalled .set (true );
607
604
} else {
608
605
env .flop ("Subscriber::onSubscribe called on an already-subscribed Subscriber" );
609
606
}
@@ -612,7 +609,7 @@ public void onSubscribe(Subscription s) {
612
609
@ Override
613
610
public void onError (Throwable cause ) {
614
611
env .debug (String .format ("%s::onError(%s)" , this , cause ));
615
- if (onSubscribeCalled . get ()) {
612
+ if (subscription . isCompleted ()) {
616
613
super .onError (cause );
617
614
} else {
618
615
env .flop (cause , String .format ("Subscriber::onError(%s) called before Subscriber::onSubscribe" , cause ));
@@ -806,7 +803,7 @@ public void expectCancelling() throws InterruptedException {
806
803
public void expectCancelling (long timeoutMillis ) throws InterruptedException {
807
804
cancelled .expectClose (timeoutMillis , "Did not receive expected cancelling of upstream subscription" );
808
805
}
809
-
806
+
810
807
public boolean isCancelled () throws InterruptedException {
811
808
return cancelled .isClosed ();
812
809
}
0 commit comments