Skip to content

Commit 1ede607

Browse files
authored
3.x: Use more appropriate operators when delegating to Flowable ops (#6888)
* 3.x: Use more appropriate operators when delegating to Flowable ops * Remove now-unused classes
1 parent 2d8660e commit 1ede607

File tree

10 files changed

+173
-96
lines changed

10 files changed

+173
-96
lines changed

src/main/java/io/reactivex/rxjava3/core/Maybe.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -344,11 +344,10 @@ public static <T> Flowable<T> concat(@NonNull Publisher<@NonNull ? extends Maybe
344344
@CheckReturnValue
345345
@NonNull
346346
@SchedulerSupport(SchedulerSupport.NONE)
347-
@SuppressWarnings({ "unchecked", "rawtypes" })
348347
public static <T> Flowable<T> concat(@NonNull Publisher<@NonNull ? extends MaybeSource<? extends T>> sources, int prefetch) {
349348
Objects.requireNonNull(sources, "sources is null");
350349
ObjectHelper.verifyPositive(prefetch, "prefetch");
351-
return RxJavaPlugins.onAssembly(new FlowableConcatMapPublisher(sources, MaybeToPublisher.instance(), prefetch, ErrorMode.IMMEDIATE));
350+
return RxJavaPlugins.onAssembly(new FlowableConcatMapMaybePublisher<>(sources, Functions.identity(), ErrorMode.IMMEDIATE, prefetch));
352351
}
353352

354353
/**
@@ -1141,7 +1140,7 @@ public static <T> Maybe<T> fromRunnable(@NonNull Runnable run) {
11411140
@SchedulerSupport(SchedulerSupport.NONE)
11421141
@NonNull
11431142
public static <T> Flowable<T> merge(@NonNull Iterable<@NonNull ? extends MaybeSource<? extends T>> sources) {
1144-
return merge(Flowable.fromIterable(sources));
1143+
return Flowable.fromIterable(sources).flatMapMaybe(Functions.identity(), false, Integer.MAX_VALUE);
11451144
}
11461145

11471146
/**
@@ -1218,11 +1217,10 @@ public static <T> Flowable<T> merge(@NonNull Publisher<@NonNull ? extends MaybeS
12181217
@CheckReturnValue
12191218
@NonNull
12201219
@SchedulerSupport(SchedulerSupport.NONE)
1221-
@SuppressWarnings({ "unchecked", "rawtypes" })
12221220
public static <T> Flowable<T> merge(@NonNull Publisher<@NonNull ? extends MaybeSource<? extends T>> sources, int maxConcurrency) {
12231221
Objects.requireNonNull(sources, "sources is null");
12241222
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
1225-
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, MaybeToPublisher.instance(), false, maxConcurrency, 1));
1223+
return RxJavaPlugins.onAssembly(new FlowableFlatMapMaybePublisher<>(sources, Functions.identity(), false, maxConcurrency));
12261224
}
12271225

12281226
/**
@@ -1490,18 +1488,14 @@ public static <T> Flowable<T> mergeArray(MaybeSource<? extends T>... sources) {
14901488
* @throws NullPointerException if {@code sources} is {@code null}
14911489
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
14921490
*/
1493-
@SuppressWarnings({ "unchecked", "rawtypes" })
14941491
@BackpressureSupport(BackpressureKind.FULL)
14951492
@CheckReturnValue
14961493
@SchedulerSupport(SchedulerSupport.NONE)
14971494
@SafeVarargs
14981495
@NonNull
14991496
public static <T> Flowable<T> mergeArrayDelayError(@NonNull MaybeSource<? extends T>... sources) {
15001497
Objects.requireNonNull(sources, "sources is null");
1501-
if (sources.length == 0) {
1502-
return Flowable.empty();
1503-
}
1504-
return Flowable.fromArray(sources).flatMap((Function)MaybeToPublisher.instance(), true, sources.length);
1498+
return Flowable.fromArray(sources).flatMapMaybe(Functions.identity(), true, Math.max(1, sources.length));
15051499
}
15061500

15071501
/**
@@ -1533,13 +1527,12 @@ public static <T> Flowable<T> mergeArrayDelayError(@NonNull MaybeSource<? extend
15331527
* @throws NullPointerException if {@code sources} is {@code null}
15341528
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
15351529
*/
1536-
@SuppressWarnings({ "unchecked", "rawtypes" })
15371530
@BackpressureSupport(BackpressureKind.FULL)
15381531
@CheckReturnValue
15391532
@SchedulerSupport(SchedulerSupport.NONE)
15401533
@NonNull
15411534
public static <T> Flowable<T> mergeDelayError(@NonNull Iterable<@NonNull ? extends MaybeSource<? extends T>> sources) {
1542-
return Flowable.fromIterable(sources).flatMap((Function)MaybeToPublisher.instance(), true);
1535+
return Flowable.fromIterable(sources).flatMapMaybe(Functions.identity(), true, Integer.MAX_VALUE);
15431536
}
15441537

15451538
/**
@@ -1609,15 +1602,14 @@ public static <T> Flowable<T> mergeDelayError(@NonNull Publisher<@NonNull ? exte
16091602
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
16101603
* @since 2.2
16111604
*/
1612-
@SuppressWarnings({ "unchecked", "rawtypes" })
16131605
@BackpressureSupport(BackpressureKind.FULL)
16141606
@CheckReturnValue
16151607
@NonNull
16161608
@SchedulerSupport(SchedulerSupport.NONE)
16171609
public static <T> Flowable<T> mergeDelayError(@NonNull Publisher<@NonNull ? extends MaybeSource<? extends T>> sources, int maxConcurrency) {
16181610
Objects.requireNonNull(sources, "sources is null");
16191611
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
1620-
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, MaybeToPublisher.instance(), true, maxConcurrency, 1));
1612+
return RxJavaPlugins.onAssembly(new FlowableFlatMapMaybePublisher<>(sources, Functions.identity(), true, maxConcurrency));
16211613
}
16221614

16231615
/**

src/main/java/io/reactivex/rxjava3/core/Single.java

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.reactivestreams.*;
2121

2222
import io.reactivex.rxjava3.annotations.*;
23-
import io.reactivex.rxjava3.core.Observable;
2423
import io.reactivex.rxjava3.disposables.Disposable;
2524
import io.reactivex.rxjava3.exceptions.*;
2625
import io.reactivex.rxjava3.functions.*;
@@ -32,7 +31,7 @@
3231
import io.reactivex.rxjava3.internal.operators.flowable.*;
3332
import io.reactivex.rxjava3.internal.operators.maybe.*;
3433
import io.reactivex.rxjava3.internal.operators.mixed.*;
35-
import io.reactivex.rxjava3.internal.operators.observable.*;
34+
import io.reactivex.rxjava3.internal.operators.observable.ObservableSingleSingle;
3635
import io.reactivex.rxjava3.internal.operators.single.*;
3736
import io.reactivex.rxjava3.internal.util.ErrorMode;
3837
import io.reactivex.rxjava3.observers.TestObserver;
@@ -195,7 +194,7 @@ public static <T> Single<T> ambArray(@NonNull SingleSource<? extends T>... sourc
195194
@SchedulerSupport(SchedulerSupport.NONE)
196195
@BackpressureSupport(BackpressureKind.FULL)
197196
public static <T> Flowable<T> concat(@NonNull Iterable<@NonNull ? extends SingleSource<? extends T>> sources) {
198-
return concat(Flowable.fromIterable(sources));
197+
return Flowable.fromIterable(sources).concatMapSingleDelayError(Functions.identity(), false);
199198
}
200199

201200
/**
@@ -216,10 +215,9 @@ public static <T> Flowable<T> concat(@NonNull Iterable<@NonNull ? extends Single
216215
@CheckReturnValue
217216
@NonNull
218217
@SchedulerSupport(SchedulerSupport.NONE)
219-
@SuppressWarnings({ "unchecked", "rawtypes" })
220218
public static <T> Observable<T> concat(@NonNull ObservableSource<? extends SingleSource<? extends T>> sources) {
221219
Objects.requireNonNull(sources, "sources is null");
222-
return RxJavaPlugins.onAssembly(new ObservableConcatMap(sources, SingleInternalHelper.toObservable(), 2, ErrorMode.IMMEDIATE));
220+
return RxJavaPlugins.onAssembly(new ObservableConcatMapSingle<>(sources, Functions.identity(), ErrorMode.IMMEDIATE, 2));
223221
}
224222

225223
/**
@@ -272,11 +270,10 @@ public static <T> Flowable<T> concat(@NonNull Publisher<@NonNull ? extends Singl
272270
@NonNull
273271
@BackpressureSupport(BackpressureKind.FULL)
274272
@SchedulerSupport(SchedulerSupport.NONE)
275-
@SuppressWarnings({ "unchecked", "rawtypes" })
276273
public static <T> Flowable<T> concat(@NonNull Publisher<@NonNull ? extends SingleSource<? extends T>> sources, int prefetch) {
277274
Objects.requireNonNull(sources, "sources is null");
278275
ObjectHelper.verifyPositive(prefetch, "prefetch");
279-
return RxJavaPlugins.onAssembly(new FlowableConcatMapPublisher(sources, SingleInternalHelper.toFlowable(), prefetch, ErrorMode.IMMEDIATE));
276+
return RxJavaPlugins.onAssembly(new FlowableConcatMapSinglePublisher<>(sources, Functions.identity(), ErrorMode.IMMEDIATE, prefetch));
280277
}
281278

282279
/**
@@ -308,7 +305,7 @@ public static <T> Flowable<T> concat(
308305
) {
309306
Objects.requireNonNull(source1, "source1 is null");
310307
Objects.requireNonNull(source2, "source2 is null");
311-
return concat(Flowable.fromArray(source1, source2));
308+
return Flowable.fromArray(source1, source2).concatMapSingleDelayError(Functions.identity(), false);
312309
}
313310

314311
/**
@@ -344,7 +341,7 @@ public static <T> Flowable<T> concat(
344341
Objects.requireNonNull(source1, "source1 is null");
345342
Objects.requireNonNull(source2, "source2 is null");
346343
Objects.requireNonNull(source3, "source3 is null");
347-
return concat(Flowable.fromArray(source1, source2, source3));
344+
return Flowable.fromArray(source1, source2, source3).concatMapSingleDelayError(Functions.identity(), false);
348345
}
349346

350347
/**
@@ -383,7 +380,7 @@ public static <T> Flowable<T> concat(
383380
Objects.requireNonNull(source2, "source2 is null");
384381
Objects.requireNonNull(source3, "source3 is null");
385382
Objects.requireNonNull(source4, "source4 is null");
386-
return concat(Flowable.fromArray(source1, source2, source3, source4));
383+
return Flowable.fromArray(source1, source2, source3, source4).concatMapSingleDelayError(Functions.identity(), false);
387384
}
388385

389386
/**
@@ -409,7 +406,7 @@ public static <T> Flowable<T> concat(
409406
@SchedulerSupport(SchedulerSupport.NONE)
410407
@SafeVarargs
411408
public static <T> Flowable<T> concatArray(@NonNull SingleSource<? extends T>... sources) {
412-
return Flowable.fromArray(sources).concatMap(SingleInternalHelper.toFlowable(), 2);
409+
return Flowable.fromArray(sources).concatMapSingleDelayError(Functions.identity(), false);
413410
}
414411

415412
/**
@@ -435,7 +432,7 @@ public static <T> Flowable<T> concatArray(@NonNull SingleSource<? extends T>...
435432
@SchedulerSupport(SchedulerSupport.NONE)
436433
@SafeVarargs
437434
public static <T> Flowable<T> concatArrayDelayError(@NonNull SingleSource<? extends T>... sources) {
438-
return Flowable.fromArray(sources).concatMapDelayError(SingleInternalHelper.toFlowable(), true, 2);
435+
return Flowable.fromArray(sources).concatMapSingleDelayError(Functions.identity(), true);
439436
}
440437

441438
/**
@@ -1091,7 +1088,7 @@ public static <T> Single<T> fromObservable(@NonNull ObservableSource<? extends T
10911088
@BackpressureSupport(BackpressureKind.FULL)
10921089
@SchedulerSupport(SchedulerSupport.NONE)
10931090
public static <T> Flowable<T> merge(@NonNull Iterable<@NonNull ? extends SingleSource<? extends T>> sources) {
1094-
return merge(Flowable.fromIterable(sources));
1091+
return Flowable.fromIterable(sources).flatMapSingle(Functions.identity());
10951092
}
10961093

10971094
/**
@@ -1129,10 +1126,9 @@ public static <T> Flowable<T> merge(@NonNull Iterable<@NonNull ? extends SingleS
11291126
@NonNull
11301127
@BackpressureSupport(BackpressureKind.FULL)
11311128
@SchedulerSupport(SchedulerSupport.NONE)
1132-
@SuppressWarnings({ "unchecked", "rawtypes" })
11331129
public static <T> Flowable<T> merge(@NonNull Publisher<@NonNull ? extends SingleSource<? extends T>> sources) {
11341130
Objects.requireNonNull(sources, "sources is null");
1135-
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, SingleInternalHelper.toFlowable(), false, Integer.MAX_VALUE, Flowable.bufferSize()));
1131+
return RxJavaPlugins.onAssembly(new FlowableFlatMapSinglePublisher<>(sources, Functions.identity(), false, Integer.MAX_VALUE));
11361132
}
11371133

11381134
/**
@@ -1212,7 +1208,7 @@ public static <T> Flowable<T> merge(
12121208
) {
12131209
Objects.requireNonNull(source1, "source1 is null");
12141210
Objects.requireNonNull(source2, "source2 is null");
1215-
return merge(Flowable.fromArray(source1, source2));
1211+
return Flowable.fromArray(source1, source2).flatMapSingle(Functions.identity(), false, Integer.MAX_VALUE);
12161212
}
12171213

12181214
/**
@@ -1265,7 +1261,7 @@ public static <T> Flowable<T> merge(
12651261
Objects.requireNonNull(source1, "source1 is null");
12661262
Objects.requireNonNull(source2, "source2 is null");
12671263
Objects.requireNonNull(source3, "source3 is null");
1268-
return merge(Flowable.fromArray(source1, source2, source3));
1264+
return Flowable.fromArray(source1, source2, source3).flatMapSingle(Functions.identity(), false, Integer.MAX_VALUE);
12691265
}
12701266

12711267
/**
@@ -1321,7 +1317,7 @@ public static <T> Flowable<T> merge(
13211317
Objects.requireNonNull(source2, "source2 is null");
13221318
Objects.requireNonNull(source3, "source3 is null");
13231319
Objects.requireNonNull(source4, "source4 is null");
1324-
return merge(Flowable.fromArray(source1, source2, source3, source4));
1320+
return Flowable.fromArray(source1, source2, source3, source4).flatMapSingle(Functions.identity(), false, Integer.MAX_VALUE);
13251321
}
13261322

13271323
/**
@@ -1360,7 +1356,7 @@ public static <T> Flowable<T> merge(
13601356
@SchedulerSupport(SchedulerSupport.NONE)
13611357
@SafeVarargs
13621358
public static <T> Flowable<T> mergeArray(SingleSource<? extends T>... sources) {
1363-
return Flowable.fromArray(sources).flatMapSingle(Functions.identity(), false, sources.length);
1359+
return Flowable.fromArray(sources).flatMapSingle(Functions.identity(), false, Math.max(1, sources.length));
13641360
}
13651361

13661362
/**
@@ -1396,7 +1392,7 @@ public static <T> Flowable<T> mergeArray(SingleSource<? extends T>... sources) {
13961392
@SafeVarargs
13971393
@NonNull
13981394
public static <T> Flowable<T> mergeArrayDelayError(@NonNull SingleSource<? extends T>... sources) {
1399-
return Flowable.fromArray(sources).flatMapSingle(Functions.identity(), true, sources.length);
1395+
return Flowable.fromArray(sources).flatMapSingle(Functions.identity(), true, Math.max(1, sources.length));
14001396
}
14011397

14021398
/**
@@ -1423,7 +1419,7 @@ public static <T> Flowable<T> mergeArrayDelayError(@NonNull SingleSource<? exten
14231419
@BackpressureSupport(BackpressureKind.FULL)
14241420
@SchedulerSupport(SchedulerSupport.NONE)
14251421
public static <T> Flowable<T> mergeDelayError(@NonNull Iterable<@NonNull ? extends SingleSource<? extends T>> sources) {
1426-
return mergeDelayError(Flowable.fromIterable(sources));
1422+
return Flowable.fromIterable(sources).flatMapSingle(Functions.identity(), true, Integer.MAX_VALUE);
14271423
}
14281424

14291425
/**
@@ -1449,10 +1445,9 @@ public static <T> Flowable<T> mergeDelayError(@NonNull Iterable<@NonNull ? exten
14491445
@NonNull
14501446
@BackpressureSupport(BackpressureKind.FULL)
14511447
@SchedulerSupport(SchedulerSupport.NONE)
1452-
@SuppressWarnings({ "unchecked", "rawtypes" })
14531448
public static <T> Flowable<T> mergeDelayError(@NonNull Publisher<@NonNull ? extends SingleSource<? extends T>> sources) {
14541449
Objects.requireNonNull(sources, "sources is null");
1455-
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, SingleInternalHelper.toFlowable(), true, Integer.MAX_VALUE, Flowable.bufferSize()));
1450+
return RxJavaPlugins.onAssembly(new FlowableFlatMapSinglePublisher<>(sources, Functions.identity(), true, Integer.MAX_VALUE));
14561451
}
14571452

14581453
/**
@@ -1490,7 +1485,7 @@ public static <T> Flowable<T> mergeDelayError(
14901485
) {
14911486
Objects.requireNonNull(source1, "source1 is null");
14921487
Objects.requireNonNull(source2, "source2 is null");
1493-
return mergeDelayError(Flowable.fromArray(source1, source2));
1488+
return Flowable.fromArray(source1, source2).flatMapSingle(Functions.identity(), true, Integer.MAX_VALUE);
14941489
}
14951490

14961491
/**
@@ -1532,7 +1527,7 @@ public static <T> Flowable<T> mergeDelayError(
15321527
Objects.requireNonNull(source1, "source1 is null");
15331528
Objects.requireNonNull(source2, "source2 is null");
15341529
Objects.requireNonNull(source3, "source3 is null");
1535-
return mergeDelayError(Flowable.fromArray(source1, source2, source3));
1530+
return Flowable.fromArray(source1, source2, source3).flatMapSingle(Functions.identity(), true, Integer.MAX_VALUE);
15361531
}
15371532

15381533
/**
@@ -1577,7 +1572,7 @@ public static <T> Flowable<T> mergeDelayError(
15771572
Objects.requireNonNull(source2, "source2 is null");
15781573
Objects.requireNonNull(source3, "source3 is null");
15791574
Objects.requireNonNull(source4, "source4 is null");
1580-
return mergeDelayError(Flowable.fromArray(source1, source2, source3, source4));
1575+
return Flowable.fromArray(source1, source2, source3, source4).flatMapSingle(Functions.identity(), true, Integer.MAX_VALUE);
15811576
}
15821577

15831578
/**

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapPublisher.java renamed to src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapMaybePublisher.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,31 +15,35 @@
1515

1616
import org.reactivestreams.*;
1717

18-
import io.reactivex.rxjava3.core.Flowable;
18+
import io.reactivex.rxjava3.core.*;
1919
import io.reactivex.rxjava3.functions.Function;
20+
import io.reactivex.rxjava3.internal.operators.flowable.FlowableFlatMapMaybe.FlatMapMaybeSubscriber;
21+
22+
/**
23+
* Maps upstream values into MaybeSources and merges their signals into one sequence.
24+
* @param <T> the source value type
25+
* @param <R> the result value type
26+
*/
27+
public final class FlowableFlatMapMaybePublisher<T, R> extends Flowable<R> {
2028

21-
public final class FlowableFlatMapPublisher<T, U> extends Flowable<U> {
2229
final Publisher<T> source;
23-
final Function<? super T, ? extends Publisher<? extends U>> mapper;
30+
31+
final Function<? super T, ? extends MaybeSource<? extends R>> mapper;
32+
2433
final boolean delayErrors;
34+
2535
final int maxConcurrency;
26-
final int bufferSize;
2736

28-
public FlowableFlatMapPublisher(Publisher<T> source,
29-
Function<? super T, ? extends Publisher<? extends U>> mapper,
30-
boolean delayErrors, int maxConcurrency, int bufferSize) {
37+
public FlowableFlatMapMaybePublisher(Publisher<T> source, Function<? super T, ? extends MaybeSource<? extends R>> mapper,
38+
boolean delayError, int maxConcurrency) {
3139
this.source = source;
3240
this.mapper = mapper;
33-
this.delayErrors = delayErrors;
41+
this.delayErrors = delayError;
3442
this.maxConcurrency = maxConcurrency;
35-
this.bufferSize = bufferSize;
3643
}
3744

3845
@Override
39-
protected void subscribeActual(Subscriber<? super U> s) {
40-
if (FlowableScalarXMap.tryScalarXMapSubscribe(source, s, mapper)) {
41-
return;
42-
}
43-
source.subscribe(FlowableFlatMap.subscribe(s, mapper, delayErrors, maxConcurrency, bufferSize));
46+
protected void subscribeActual(Subscriber<? super R> s) {
47+
source.subscribe(new FlatMapMaybeSubscriber<>(s, mapper, delayErrors, maxConcurrency));
4448
}
4549
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.internal.operators.flowable;
15+
16+
import org.reactivestreams.*;
17+
18+
import io.reactivex.rxjava3.core.*;
19+
import io.reactivex.rxjava3.functions.Function;
20+
import io.reactivex.rxjava3.internal.operators.flowable.FlowableFlatMapSingle.FlatMapSingleSubscriber;
21+
22+
/**
23+
* Maps upstream values into SingleSources and merges their signals into one sequence.
24+
* @param <T> the source value type
25+
* @param <R> the result value type
26+
*/
27+
public final class FlowableFlatMapSinglePublisher<T, R> extends Flowable<R> {
28+
29+
final Publisher<T> source;
30+
31+
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
32+
33+
final boolean delayErrors;
34+
35+
final int maxConcurrency;
36+
37+
public FlowableFlatMapSinglePublisher(Publisher<T> source, Function<? super T, ? extends SingleSource<? extends R>> mapper,
38+
boolean delayError, int maxConcurrency) {
39+
this.source = source;
40+
this.mapper = mapper;
41+
this.delayErrors = delayError;
42+
this.maxConcurrency = maxConcurrency;
43+
}
44+
45+
@Override
46+
protected void subscribeActual(Subscriber<? super R> s) {
47+
source.subscribe(new FlatMapSingleSubscriber<>(s, mapper, delayErrors, maxConcurrency));
48+
}
49+
}

0 commit comments

Comments
 (0)