Skip to content

Commit 5f6aafc

Browse files
authored
3.x: Fix groupBy not requesting more if a group is cancelled w/ items (#6895)
1 parent f1a795d commit 5f6aafc

File tree

2 files changed

+35
-0
lines changed

2 files changed

+35
-0
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,7 @@ public void request(long n) {
401401
public void cancel() {
402402
if (cancelled.compareAndSet(false, true)) {
403403
cancelParent();
404+
drain();
404405
}
405406
}
406407

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2444,4 +2444,38 @@ public void accept(Integer v) throws Throwable {
24442444
.assertNoErrors()
24452445
.assertComplete();
24462446
}
2447+
2448+
@Test
2449+
public void cancelledGroupResumesRequesting() {
2450+
final List<TestSubscriber<Integer>> tss = new ArrayList<>();
2451+
final AtomicInteger counter = new AtomicInteger();
2452+
final AtomicBoolean done = new AtomicBoolean();
2453+
Flowable.range(1, 1000)
2454+
.doOnNext(new Consumer<Integer>() {
2455+
@Override
2456+
public void accept(Integer v) throws Exception {
2457+
counter.getAndIncrement();
2458+
}
2459+
})
2460+
.groupBy(Functions.justFunction(1))
2461+
.subscribe(new Consumer<GroupedFlowable<Integer, Integer>>() {
2462+
@Override
2463+
public void accept(GroupedFlowable<Integer, Integer> v) throws Exception {
2464+
TestSubscriber<Integer> ts = TestSubscriber.create(0L);
2465+
tss.add(ts);
2466+
v.subscribe(ts);
2467+
}
2468+
}, Functions.emptyConsumer(), new Action() {
2469+
@Override
2470+
public void run() throws Exception {
2471+
done.set(true);
2472+
}
2473+
});
2474+
2475+
while (!done.get()) {
2476+
tss.remove(0).cancel();
2477+
}
2478+
2479+
assertEquals(1000, counter.get());
2480+
}
24472481
}

0 commit comments

Comments
 (0)