Skip to content

Commit 7c0793d

Browse files
authored
3.x: Fix parallel() on grouped flowable not replenishing properly (#6719)
* 3.x: Fix parallel() on grouped flowable not replenishing properly * Remove accidental import * Avoid calling `isEmpty` * Undo some of the parallel changes * Undo all changes to ParallelFromPublisher * Again, undo
1 parent 5026999 commit 7c0793d

File tree

3 files changed

+60
-6
lines changed

3 files changed

+60
-6
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -743,19 +743,27 @@ public T poll() {
743743
produced++;
744744
return v;
745745
}
746+
tryReplenish();
747+
return null;
748+
}
749+
750+
@Override
751+
public boolean isEmpty() {
752+
if (queue.isEmpty()) {
753+
tryReplenish();
754+
return true;
755+
}
756+
return false;
757+
}
758+
759+
void tryReplenish() {
746760
int p = produced;
747761
if (p != 0) {
748762
produced = 0;
749763
if ((once.get() & ABANDONED) == 0) {
750764
parent.upstream.request(p);
751765
}
752766
}
753-
return null;
754-
}
755-
756-
@Override
757-
public boolean isEmpty() {
758-
return queue.isEmpty();
759767
}
760768

761769
@Override

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2413,4 +2413,34 @@ public void run() {
24132413
}
24142414
}
24152415
}
2416+
2417+
@Test
2418+
public void fusedParallelGroupProcessing() {
2419+
Flowable.range(0, 500000)
2420+
.subscribeOn(Schedulers.single())
2421+
.groupBy(new Function<Integer, Integer>() {
2422+
@Override
2423+
public Integer apply(Integer i) throws Throwable {
2424+
return i % 2;
2425+
}
2426+
})
2427+
.flatMap(new Function<GroupedFlowable<Integer, Integer>, Publisher<Integer>>() {
2428+
@Override
2429+
public Publisher<Integer> apply(GroupedFlowable<Integer, Integer> g) {
2430+
return g.getKey() == 0
2431+
? g
2432+
.parallel()
2433+
.runOn(Schedulers.computation())
2434+
.map(Functions.<Integer>identity())
2435+
.sequential()
2436+
: g.map(Functions.<Integer>identity()) // no need to use hide
2437+
;
2438+
}
2439+
})
2440+
.test()
2441+
.awaitDone(20, TimeUnit.SECONDS)
2442+
.assertValueCount(500000)
2443+
.assertComplete()
2444+
.assertNoErrors();
2445+
}
24162446
}

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1972,4 +1972,20 @@ public void fusedNoConcurrentCleanDueToCancel() {
19721972
}
19731973
}
19741974
}
1975+
1976+
@Test
1977+
public void fusedParallelProcessing() {
1978+
Flowable.range(0, 500000)
1979+
.subscribeOn(Schedulers.single())
1980+
.observeOn(Schedulers.computation())
1981+
.parallel()
1982+
.runOn(Schedulers.computation())
1983+
.map(Functions.<Integer>identity())
1984+
.sequential()
1985+
.test()
1986+
.awaitDone(20, TimeUnit.SECONDS)
1987+
.assertValueCount(500000)
1988+
.assertComplete()
1989+
.assertNoErrors();
1990+
}
19751991
}

0 commit comments

Comments
 (0)