Skip to content

Commit e00c499

Browse files
authored
3.x: Fix mergeWith not cancelling the other source if the main errors (#6598)
* 3.x: Fix mergeWith not cancelling the other source if the main errors * Fix typo in the test name
1 parent 5c3d4c6 commit e00c499

12 files changed

+224
-9
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithCompletable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public void onNext(T t) {
8686

8787
@Override
8888
public void onError(Throwable ex) {
89-
SubscriptionHelper.cancel(mainSubscription);
89+
DisposableHelper.dispose(otherObserver);
9090
HalfSerializer.onError(downstream, ex, this, error);
9191
}
9292

src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithMaybe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public void onNext(T t) {
143143
@Override
144144
public void onError(Throwable ex) {
145145
if (error.addThrowable(ex)) {
146-
SubscriptionHelper.cancel(mainSubscription);
146+
DisposableHelper.dispose(otherObserver);
147147
drain();
148148
} else {
149149
RxJavaPlugins.onError(ex);

src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithSingle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public void onNext(T t) {
143143
@Override
144144
public void onError(Throwable ex) {
145145
if (error.addThrowable(ex)) {
146-
SubscriptionHelper.cancel(mainSubscription);
146+
DisposableHelper.dispose(otherObserver);
147147
drain();
148148
} else {
149149
RxJavaPlugins.onError(ex);

src/main/java/io/reactivex/internal/operators/observable/ObservableMergeWithCompletable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public void onNext(T t) {
8080

8181
@Override
8282
public void onError(Throwable ex) {
83-
DisposableHelper.dispose(mainDisposable);
83+
DisposableHelper.dispose(otherObserver);
8484
HalfSerializer.onError(downstream, ex, this, error);
8585
}
8686

src/main/java/io/reactivex/internal/operators/observable/ObservableMergeWithMaybe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public void onNext(T t) {
106106
@Override
107107
public void onError(Throwable ex) {
108108
if (error.addThrowable(ex)) {
109-
DisposableHelper.dispose(mainDisposable);
109+
DisposableHelper.dispose(otherObserver);
110110
drain();
111111
} else {
112112
RxJavaPlugins.onError(ex);

src/main/java/io/reactivex/internal/operators/observable/ObservableMergeWithSingle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public void onNext(T t) {
106106
@Override
107107
public void onError(Throwable ex) {
108108
if (error.addThrowable(ex)) {
109-
DisposableHelper.dispose(mainDisposable);
109+
DisposableHelper.dispose(otherObserver);
110110
drain();
111111
} else {
112112
RxJavaPlugins.onError(ex);

src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithCompletableTest.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16-
import org.junit.Test;
17-
1816
import static org.junit.Assert.*;
1917

18+
import org.junit.Test;
19+
2020
import io.reactivex.*;
2121
import io.reactivex.exceptions.TestException;
2222
import io.reactivex.functions.Action;
@@ -137,4 +137,40 @@ public void run() {
137137
ts.assertResult(1);
138138
}
139139
}
140+
141+
@Test
142+
public void cancelOtherOnMainError() {
143+
PublishProcessor<Integer> pp = PublishProcessor.create();
144+
CompletableSubject cs = CompletableSubject.create();
145+
146+
TestSubscriber<Integer> ts = pp.mergeWith(cs).test();
147+
148+
assertTrue(pp.hasSubscribers());
149+
assertTrue(cs.hasObservers());
150+
151+
pp.onError(new TestException());
152+
153+
ts.assertFailure(TestException.class);
154+
155+
assertFalse("main has observers!", pp.hasSubscribers());
156+
assertFalse("other has observers", cs.hasObservers());
157+
}
158+
159+
@Test
160+
public void cancelMainOnOtherError() {
161+
PublishProcessor<Integer> pp = PublishProcessor.create();
162+
CompletableSubject cs = CompletableSubject.create();
163+
164+
TestSubscriber<Integer> ts = pp.mergeWith(cs).test();
165+
166+
assertTrue(pp.hasSubscribers());
167+
assertTrue(cs.hasObservers());
168+
169+
cs.onError(new TestException());
170+
171+
ts.assertFailure(TestException.class);
172+
173+
assertFalse("main has observers!", pp.hasSubscribers());
174+
assertFalse("other has observers", cs.hasObservers());
175+
}
140176
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithMaybeTest.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import io.reactivex.internal.subscriptions.BooleanSubscription;
2929
import io.reactivex.plugins.RxJavaPlugins;
3030
import io.reactivex.processors.PublishProcessor;
31-
import io.reactivex.subjects.MaybeSubject;
31+
import io.reactivex.subjects.*;
3232
import io.reactivex.subscribers.TestSubscriber;
3333
import io.reactivex.testsupport.TestHelper;
3434

@@ -402,4 +402,40 @@ public void onNext(Integer t) {
402402
ts.assertValueCount(Flowable.bufferSize());
403403
ts.assertComplete();
404404
}
405+
406+
@Test
407+
public void cancelOtherOnMainError() {
408+
PublishProcessor<Integer> pp = PublishProcessor.create();
409+
MaybeSubject<Integer> ms = MaybeSubject.create();
410+
411+
TestSubscriber<Integer> ts = pp.mergeWith(ms).test();
412+
413+
assertTrue(pp.hasSubscribers());
414+
assertTrue(ms.hasObservers());
415+
416+
pp.onError(new TestException());
417+
418+
ts.assertFailure(TestException.class);
419+
420+
assertFalse("main has observers!", pp.hasSubscribers());
421+
assertFalse("other has observers", ms.hasObservers());
422+
}
423+
424+
@Test
425+
public void cancelMainOnOtherError() {
426+
PublishProcessor<Integer> pp = PublishProcessor.create();
427+
MaybeSubject<Integer> ms = MaybeSubject.create();
428+
429+
TestSubscriber<Integer> ts = pp.mergeWith(ms).test();
430+
431+
assertTrue(pp.hasSubscribers());
432+
assertTrue(ms.hasObservers());
433+
434+
ms.onError(new TestException());
435+
436+
ts.assertFailure(TestException.class);
437+
438+
assertFalse("main has observers!", pp.hasSubscribers());
439+
assertFalse("other has observers", ms.hasObservers());
440+
}
405441
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithSingleTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,4 +398,40 @@ public void onNext(Integer t) {
398398
ts.assertValueCount(Flowable.bufferSize());
399399
ts.assertComplete();
400400
}
401+
402+
@Test
403+
public void cancelOtherOnMainError() {
404+
PublishProcessor<Integer> pp = PublishProcessor.create();
405+
SingleSubject<Integer> ss = SingleSubject.create();
406+
407+
TestSubscriber<Integer> ts = pp.mergeWith(ss).test();
408+
409+
assertTrue(pp.hasSubscribers());
410+
assertTrue(ss.hasObservers());
411+
412+
pp.onError(new TestException());
413+
414+
ts.assertFailure(TestException.class);
415+
416+
assertFalse("main has observers!", pp.hasSubscribers());
417+
assertFalse("other has observers", ss.hasObservers());
418+
}
419+
420+
@Test
421+
public void cancelMainOnOtherError() {
422+
PublishProcessor<Integer> pp = PublishProcessor.create();
423+
SingleSubject<Integer> ss = SingleSubject.create();
424+
425+
TestSubscriber<Integer> ts = pp.mergeWith(ss).test();
426+
427+
assertTrue(pp.hasSubscribers());
428+
assertTrue(ss.hasObservers());
429+
430+
ss.onError(new TestException());
431+
432+
ts.assertFailure(TestException.class);
433+
434+
assertFalse("main has observers!", pp.hasSubscribers());
435+
assertFalse("other has observers", ss.hasObservers());
436+
}
401437
}

src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithCompletableTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,4 +136,40 @@ protected void subscribeActual(Observer<? super Integer> observer) {
136136
.test()
137137
.assertResult(1);
138138
}
139+
140+
@Test
141+
public void cancelOtherOnMainError() {
142+
PublishSubject<Integer> ps = PublishSubject.create();
143+
CompletableSubject cs = CompletableSubject.create();
144+
145+
TestObserver<Integer> to = ps.mergeWith(cs).test();
146+
147+
assertTrue(ps.hasObservers());
148+
assertTrue(cs.hasObservers());
149+
150+
ps.onError(new TestException());
151+
152+
to.assertFailure(TestException.class);
153+
154+
assertFalse("main has observers!", ps.hasObservers());
155+
assertFalse("other has observers", cs.hasObservers());
156+
}
157+
158+
@Test
159+
public void cancelMainOnOtherError() {
160+
PublishSubject<Integer> ps = PublishSubject.create();
161+
CompletableSubject cs = CompletableSubject.create();
162+
163+
TestObserver<Integer> to = ps.mergeWith(cs).test();
164+
165+
assertTrue(ps.hasObservers());
166+
assertTrue(cs.hasObservers());
167+
168+
cs.onError(new TestException());
169+
170+
to.assertFailure(TestException.class);
171+
172+
assertFalse("main has observers!", ps.hasObservers());
173+
assertFalse("other has observers", cs.hasObservers());
174+
}
139175
}

src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithMaybeTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,4 +273,39 @@ public void onNext(Integer t) {
273273
to.assertResult(0, 1, 2, 3, 4);
274274
}
275275

276+
@Test
277+
public void cancelOtherOnMainError() {
278+
PublishSubject<Integer> ps = PublishSubject.create();
279+
MaybeSubject<Integer> ms = MaybeSubject.create();
280+
281+
TestObserver<Integer> to = ps.mergeWith(ms).test();
282+
283+
assertTrue(ps.hasObservers());
284+
assertTrue(ms.hasObservers());
285+
286+
ps.onError(new TestException());
287+
288+
to.assertFailure(TestException.class);
289+
290+
assertFalse("main has observers!", ps.hasObservers());
291+
assertFalse("other has observers", ms.hasObservers());
292+
}
293+
294+
@Test
295+
public void cancelMainOnOtherError() {
296+
PublishSubject<Integer> ps = PublishSubject.create();
297+
MaybeSubject<Integer> ms = MaybeSubject.create();
298+
299+
TestObserver<Integer> to = ps.mergeWith(ms).test();
300+
301+
assertTrue(ps.hasObservers());
302+
assertTrue(ms.hasObservers());
303+
304+
ms.onError(new TestException());
305+
306+
to.assertFailure(TestException.class);
307+
308+
assertFalse("main has observers!", ps.hasObservers());
309+
assertFalse("other has observers", ms.hasObservers());
310+
}
276311
}

src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithSingleTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,4 +264,40 @@ public void onNext(Integer t) {
264264

265265
to.assertResult(0, 1, 2, 3, 4);
266266
}
267+
268+
@Test
269+
public void cancelOtherOnMainError() {
270+
PublishSubject<Integer> ps = PublishSubject.create();
271+
SingleSubject<Integer> ss = SingleSubject.create();
272+
273+
TestObserver<Integer> to = ps.mergeWith(ss).test();
274+
275+
assertTrue(ps.hasObservers());
276+
assertTrue(ss.hasObservers());
277+
278+
ps.onError(new TestException());
279+
280+
to.assertFailure(TestException.class);
281+
282+
assertFalse("main has observers!", ps.hasObservers());
283+
assertFalse("other has observers", ss.hasObservers());
284+
}
285+
286+
@Test
287+
public void cancelMainOnOtherError() {
288+
PublishSubject<Integer> ps = PublishSubject.create();
289+
SingleSubject<Integer> ss = SingleSubject.create();
290+
291+
TestObserver<Integer> to = ps.mergeWith(ss).test();
292+
293+
assertTrue(ps.hasObservers());
294+
assertTrue(ss.hasObservers());
295+
296+
ss.onError(new TestException());
297+
298+
to.assertFailure(TestException.class);
299+
300+
assertFalse("main has observers!", ps.hasObservers());
301+
assertFalse("other has observers", ss.hasObservers());
302+
}
267303
}

0 commit comments

Comments
 (0)