Skip to content

Commit 0fcbf89

Browse files
committed
Merge pull request #3380 from akarnokd/CombineLatestCleanup2x
2.x: CombineLatest removed leftover debug field + better cleanup
2 parents d394c49 + ca20a2b commit 0fcbf89

File tree

2 files changed

+26
-6
lines changed

2 files changed

+26
-6
lines changed

src/main/java/io/reactivex/internal/operators/PublisherCombineLatest.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,13 +152,18 @@ public void cancel() {
152152
}
153153

154154
void cancel(Queue<?> q) {
155-
q.clear();
155+
clear(q);
156156
for (CombinerSubscriber<T, R> s : subscribers) {
157157
s.cancel();
158158
}
159159
}
160-
161-
Object last = "1";
160+
161+
void clear(Queue<?> q) {
162+
synchronized (this) {
163+
Arrays.fill(latest, null);
164+
}
165+
q.clear();
166+
}
162167

163168
void combine(T value, int index) {
164169
CombinerSubscriber<T, R> cs = subscribers[index];
@@ -169,6 +174,9 @@ void combine(T value, int index) {
169174
boolean empty;
170175
boolean f;
171176
synchronized (this) {
177+
if (cancelled) {
178+
return;
179+
}
172180
len = latest.length;
173181
Object o = latest[index];
174182
a = active;
@@ -287,6 +295,7 @@ boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a, Queue<?> q, b
287295
if (d) {
288296
if (delayError) {
289297
if (empty) {
298+
clear(queue);
290299
Throwable e = error;
291300
if (e != null) {
292301
a.onError(e);
@@ -303,6 +312,7 @@ boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a, Queue<?> q, b
303312
return true;
304313
} else
305314
if (empty) {
315+
clear(queue);
306316
a.onComplete();
307317
return true;
308318
}

src/main/java/io/reactivex/internal/operators/nbp/NbpOnSubscribeCombineLatest.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
package io.reactivex.internal.operators.nbp;
1515

16-
import java.util.Queue;
16+
import java.util.*;
1717
import java.util.concurrent.atomic.*;
1818
import java.util.function.Function;
1919

@@ -139,13 +139,18 @@ public void dispose() {
139139
}
140140

141141
void cancel(Queue<?> q) {
142-
q.clear();
142+
clear(q);
143143
for (CombinerSubscriber<T, R> s : subscribers) {
144144
s.dispose();
145145
}
146146
}
147147

148-
Object last = "1";
148+
void clear(Queue<?> q) {
149+
synchronized (this) {
150+
Arrays.fill(latest, null);
151+
}
152+
q.clear();
153+
}
149154

150155
void combine(T value, int index) {
151156
CombinerSubscriber<T, R> cs = subscribers[index];
@@ -156,6 +161,9 @@ void combine(T value, int index) {
156161
boolean empty;
157162
boolean f;
158163
synchronized (this) {
164+
if (cancelled) {
165+
return;
166+
}
159167
len = latest.length;
160168
Object o = latest[index];
161169
a = active;
@@ -258,6 +266,7 @@ boolean checkTerminated(boolean d, boolean empty, NbpSubscriber<?> a, Queue<?> q
258266
if (d) {
259267
if (delayError) {
260268
if (empty) {
269+
clear(queue);
261270
Throwable e = error;
262271
if (e != null) {
263272
a.onError(e);
@@ -274,6 +283,7 @@ boolean checkTerminated(boolean d, boolean empty, NbpSubscriber<?> a, Queue<?> q
274283
return true;
275284
} else
276285
if (empty) {
286+
clear(queue);
277287
a.onComplete();
278288
return true;
279289
}

0 commit comments

Comments
 (0)