diff --git a/README.md b/README.md
index 2a0f1f3dec..e8e37a8c41 100644
--- a/README.md
+++ b/README.md
@@ -5,7 +5,7 @@
[ ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.2.1)
Library support for Kotlin coroutines with [multiplatform](#multiplatform) support.
-This is a companion version for Kotlin `1.3.30` release.
+This is a companion version for Kotlin `1.3.31` release.
```kotlin
suspend fun main() = coroutineScope {
@@ -89,7 +89,7 @@ And make sure that you use the latest Kotlin version:
```xml
- 1.3.30
+ 1.3.31
```
@@ -107,7 +107,7 @@ And make sure that you use the latest Kotlin version:
```groovy
buildscript {
- ext.kotlin_version = '1.3.30'
+ ext.kotlin_version = '1.3.31'
}
```
@@ -133,7 +133,7 @@ And make sure that you use the latest Kotlin version:
```groovy
plugins {
- kotlin("jvm") version "1.3.30"
+ kotlin("jvm") version "1.3.31"
}
```
diff --git a/benchmarks/build.gradle b/benchmarks/build.gradle
index 728804ad4b..157eb88aa6 100644
--- a/benchmarks/build.gradle
+++ b/benchmarks/build.gradle
@@ -1,6 +1,8 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
+sourceCompatibility = 1.8
+targetCompatibility = 1.8
apply plugin: "net.ltgt.apt"
apply plugin: "com.github.johnrengelman.shadow"
@@ -10,15 +12,50 @@ repositories {
maven { url "https://repo.typesafe.com/typesafe/releases/" }
}
-jmh.jmhVersion = '1.21'
+compileJmhKotlin {
+ kotlinOptions {
+ jvmTarget = "1.8"
+ freeCompilerArgs += ['-Xjvm-default=enable']
+ }
+}
+
+/*
+ * Due to a bug in the inliner it sometimes does not remove inlined symbols (that are later renamed) from unused code paths,
+ * and it breaks JMH that tries to post-process these symbols and fails because they are renamed.
+ */
+task removeRedundantFiles(type: Delete) {
+ delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class"
+ delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$nBlanks\$1\$\$special\$\$inlined\$map\$1\$1.class"
+ delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$score2\$1\$\$special\$\$inlined\$map\$1\$1.class"
+ delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$bonusForDoubleLetter\$1\$\$special\$\$inlined\$map\$1\$1.class"
+ delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$nBlanks\$1\$\$special\$\$inlined\$map\$1\$2\$1.class"
+ delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$bonusForDoubleLetter\$1\$\$special\$\$inlined\$map\$1\$2\$1.class"
+ delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$score2\$1\$\$special\$\$inlined\$map\$1\$2\$1.class"
+ delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOptKt\$\$special\$\$inlined\$collect\$1\$1.class"
+ delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOptKt\$\$special\$\$inlined\$collect\$2\$1.class"
+ delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class"
+ delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class"
+ delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class"
+ delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble//SaneFlowPlaysScrabble\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class"
+
+ // Primes
+ delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$2\$1.class"
+ delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$1.class"
+}
+
+jmhRunBytecodeGenerator.dependsOn(removeRedundantFiles)
// It is better to use the following to run benchmarks, otherwise you may get unexpected errors:
-// ../gradlew --no-daemon cleanJmhJar jmh
+// ./gradlew --no-daemon cleanJmhJar jmh -Pjmh="MyBenchmark"
jmh {
+ jmhVersion = '1.21'
duplicateClassesStrategy DuplicatesStrategy.INCLUDE
failOnError = true
resultFormat = 'CSV'
-// include = ['.*ChannelProducerConsumer.*']
+ if (project.hasProperty('jmh')) {
+ include = ".*" + project.jmh + ".*"
+ }
+// includeTests = false
}
jmhJar {
@@ -29,8 +66,12 @@ jmhJar {
}
dependencies {
+ compile "org.openjdk.jmh:jmh-core:1.21"
+ compile "io.projectreactor:reactor-core:$reactor_vesion"
+ compile 'io.reactivex.rxjava2:rxjava:2.1.9'
+ compile "com.github.akarnokd:rxjava2-extensions:0.20.8"
+
compile "org.openjdk.jmh:jmh-core:1.21"
compile 'com.typesafe.akka:akka-actor_2.12:2.5.0'
compile project(':kotlinx-coroutines-core')
}
-
diff --git a/benchmarks/src/jmh/java/benchmarks/flow/scrabble/RxJava2PlaysScrabble.java b/benchmarks/src/jmh/java/benchmarks/flow/scrabble/RxJava2PlaysScrabble.java
new file mode 100644
index 0000000000..2a85d0dbdd
--- /dev/null
+++ b/benchmarks/src/jmh/java/benchmarks/flow/scrabble/RxJava2PlaysScrabble.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package benchmarks.flow.scrabble;
+
+import benchmarks.flow.scrabble.IterableSpliterator;
+import benchmarks.flow.scrabble.ShakespearePlaysScrabble;
+import io.reactivex.Flowable;
+import io.reactivex.Maybe;
+import io.reactivex.Single;
+import io.reactivex.functions.Function;
+import org.openjdk.jmh.annotations.*;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Shakespeare plays Scrabble with RxJava 2 Flowable.
+ * @author José
+ * @author akarnokd
+ */
+@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+public class RxJava2PlaysScrabble extends ShakespearePlaysScrabble {
+
+ @Benchmark
+ @Override
+ public List>> play() throws Exception {
+
+ // Function to compute the score of a given word
+ Function> scoreOfALetter = letter -> Flowable.just(letterScores[letter - 'a']) ;
+
+ // score of the same letters in a word
+ Function, Flowable> letterScore =
+ entry ->
+ Flowable.just(
+ letterScores[entry.getKey() - 'a'] *
+ Integer.min(
+ (int)entry.getValue().get(),
+ scrabbleAvailableLetters[entry.getKey() - 'a']
+ )
+ ) ;
+
+ Function> toIntegerFlowable =
+ string -> Flowable.fromIterable(IterableSpliterator.of(string.chars().boxed().spliterator())) ;
+
+ // Histogram of the letters in a given word
+ Function>> histoOfLetters =
+ word -> toIntegerFlowable.apply(word)
+ .collect(
+ () -> new HashMap<>(),
+ (HashMap map, Integer value) ->
+ {
+ LongWrapper newValue = map.get(value) ;
+ if (newValue == null) {
+ newValue = () -> 0L ;
+ }
+ map.put(value, newValue.incAndSet()) ;
+ }
+
+ ) ;
+
+ // number of blanks for a given letter
+ Function, Flowable> blank =
+ entry ->
+ Flowable.just(
+ Long.max(
+ 0L,
+ entry.getValue().get() -
+ scrabbleAvailableLetters[entry.getKey() - 'a']
+ )
+ ) ;
+
+ // number of blanks for a given word
+ Function> nBlanks =
+ word -> histoOfLetters.apply(word)
+ .flatMapPublisher(map -> Flowable.fromIterable(() -> map.entrySet().iterator()))
+ .flatMap(blank)
+ .reduce(Long::sum) ;
+
+
+ // can a word be written with 2 blanks?
+ Function> checkBlanks =
+ word -> nBlanks.apply(word)
+ .flatMap(l -> Maybe.just(l <= 2L)) ;
+
+ // score taking blanks into account letterScore1
+ Function> score2 =
+ word -> histoOfLetters.apply(word)
+ .flatMapPublisher(map -> Flowable.fromIterable(() -> map.entrySet().iterator()))
+ .flatMap(letterScore)
+ .reduce(Integer::sum) ;
+
+ // Placing the word on the board
+ // Building the streams of first and last letters
+ Function> first3 =
+ word -> Flowable.fromIterable(IterableSpliterator.of(word.chars().boxed().limit(3).spliterator())) ;
+ Function> last3 =
+ word -> Flowable.fromIterable(IterableSpliterator.of(word.chars().boxed().skip(3).spliterator())) ;
+
+
+ // Stream to be maxed
+ Function> toBeMaxed =
+ word -> Flowable.just(first3.apply(word), last3.apply(word))
+ .flatMap(observable -> observable) ;
+
+ // Bonus for double letter
+ Function> bonusForDoubleLetter =
+ word -> toBeMaxed.apply(word)
+ .flatMap(scoreOfALetter)
+ .reduce(Integer::max) ;
+
+ // score of the word put on the board
+ Function> score3 =
+ word ->
+ Maybe.merge(Arrays.asList(
+ score2.apply(word),
+ score2.apply(word),
+ bonusForDoubleLetter.apply(word),
+ bonusForDoubleLetter.apply(word),
+ Maybe.just(word.length() == 7 ? 50 : 0)
+ )
+ )
+ .reduce(Integer::sum) ;
+
+ Function>, Single>>> buildHistoOnScore =
+ score -> Flowable.fromIterable(() -> shakespeareWords.iterator())
+ .filter(scrabbleWords::contains)
+ .filter(word -> checkBlanks.apply(word).blockingGet())
+ .collect(
+ () -> new TreeMap<>(Comparator.reverseOrder()),
+ (TreeMap> map, String word) -> {
+ Integer key = score.apply(word).blockingGet() ;
+ List list = map.get(key) ;
+ if (list == null) {
+ list = new ArrayList<>() ;
+ map.put(key, list) ;
+ }
+ list.add(word) ;
+ }
+ ) ;
+
+ // best key / value pairs
+ List>> finalList2 =
+ buildHistoOnScore.apply(score3)
+ .flatMapPublisher(map -> Flowable.fromIterable(() -> map.entrySet().iterator()))
+ .take(3)
+ .collect(
+ () -> new ArrayList>>(),
+ (list, entry) -> {
+ list.add(entry) ;
+ }
+ )
+ .blockingGet() ;
+ return finalList2 ;
+ }
+}
\ No newline at end of file
diff --git a/benchmarks/src/jmh/java/benchmarks/flow/scrabble/RxJava2PlaysScrabbleOpt.java b/benchmarks/src/jmh/java/benchmarks/flow/scrabble/RxJava2PlaysScrabbleOpt.java
new file mode 100644
index 0000000000..7a7cb1aa4e
--- /dev/null
+++ b/benchmarks/src/jmh/java/benchmarks/flow/scrabble/RxJava2PlaysScrabbleOpt.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package benchmarks.flow.scrabble;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import hu.akarnokd.rxjava2.math.MathFlowable;
+import org.openjdk.jmh.annotations.*;
+import benchmarks.flow.scrabble.optimizations.*;
+import io.reactivex.*;
+import io.reactivex.functions.Function;
+
+/**
+ * Shakespeare plays Scrabble with RxJava 2 Flowable optimized.
+ * @author José
+ * @author akarnokd
+ */
+@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+public class RxJava2PlaysScrabbleOpt extends ShakespearePlaysScrabble {
+ static Flowable chars(String word) {
+// return Flowable.range(0, word.length()).map(i -> (int)word.charAt(i));
+ return StringFlowable.characters(word);
+ }
+
+ @Benchmark
+ @Override
+ public List>> play() throws Exception {
+
+ // to compute the score of a given word
+ Function scoreOfALetter = letter -> letterScores[letter - 'a'];
+
+ // score of the same letters in a word
+ Function, Integer> letterScore =
+ entry ->
+ letterScores[entry.getKey() - 'a'] *
+ Integer.min(
+ (int)entry.getValue().get(),
+ scrabbleAvailableLetters[entry.getKey() - 'a']
+ )
+ ;
+
+
+ Function> toIntegerFlowable =
+ string -> chars(string);
+
+ Map>> histoCache = new HashMap<>();
+ // Histogram of the letters in a given word
+ Function>> histoOfLetters =
+ word -> { Single> s = histoCache.get(word);
+ if (s == null) {
+ s = toIntegerFlowable.apply(word)
+ .collect(
+ () -> new HashMap<>(),
+ (HashMap map, Integer value) ->
+ {
+ MutableLong newValue = map.get(value) ;
+ if (newValue == null) {
+ newValue = new MutableLong();
+ map.put(value, newValue);
+ }
+ newValue.incAndSet();
+ }
+
+ );
+ histoCache.put(word, s);
+ }
+ return s;
+ };
+
+ // number of blanks for a given letter
+ Function, Long> blank =
+ entry ->
+ Long.max(
+ 0L,
+ entry.getValue().get() -
+ scrabbleAvailableLetters[entry.getKey() - 'a']
+ )
+ ;
+
+ // number of blanks for a given word
+ Function> nBlanks =
+ word -> MathFlowable.sumLong(
+ histoOfLetters.apply(word).flattenAsFlowable(
+ map -> map.entrySet()
+ )
+ .map(blank)
+ )
+ ;
+
+
+ // can a word be written with 2 blanks?
+ Function> checkBlanks =
+ word -> nBlanks.apply(word)
+ .map(l -> l <= 2L) ;
+
+ // score taking blanks into account letterScore1
+ Function> score2 =
+ word -> MathFlowable.sumInt(
+ histoOfLetters.apply(word).flattenAsFlowable(
+ map -> map.entrySet()
+ )
+ .map(letterScore)
+ ) ;
+
+ // Placing the word on the board
+ // Building the streams of first and last letters
+ Function> first3 =
+ word -> chars(word).take(3) ;
+ Function> last3 =
+ word -> chars(word).skip(3) ;
+
+
+ // Stream to be maxed
+ Function> toBeMaxed =
+ word -> Flowable.concat(first3.apply(word), last3.apply(word))
+ ;
+
+ // Bonus for double letter
+ Function> bonusForDoubleLetter =
+ word -> MathFlowable.max(toBeMaxed.apply(word)
+ .map(scoreOfALetter)
+ ) ;
+
+ // score of the word put on the board
+ Function> score3 =
+ word ->
+ MathFlowable.sumInt(Flowable.concat(
+ score2.apply(word),
+ bonusForDoubleLetter.apply(word)
+ )).map(v -> v * 2 + (word.length() == 7 ? 50 : 0));
+
+ Function>, Single>>> buildHistoOnScore =
+ score -> Flowable.fromIterable(shakespeareWords)
+ .filter(scrabbleWords::contains)
+ .filter(word -> checkBlanks.apply(word).blockingFirst())
+ .collect(
+ () -> new TreeMap>(Comparator.reverseOrder()),
+ (TreeMap> map, String word) -> {
+ Integer key = score.apply(word).blockingFirst() ;
+ List list = map.get(key) ;
+ if (list == null) {
+ list = new ArrayList<>() ;
+ map.put(key, list) ;
+ }
+ list.add(word) ;
+ }
+ ) ;
+
+ // best key / value pairs
+ List>> finalList2 =
+ buildHistoOnScore.apply(score3).flattenAsFlowable(
+ map -> map.entrySet()
+ )
+ .take(3)
+ .collect(
+ () -> new ArrayList>>(),
+ (list, entry) -> {
+ list.add(entry) ;
+ }
+ )
+ .blockingGet();
+
+ return finalList2 ;
+ }
+}
\ No newline at end of file
diff --git a/benchmarks/src/jmh/java/benchmarks/flow/scrabble/optimizations/FlowableCharSequence.java b/benchmarks/src/jmh/java/benchmarks/flow/scrabble/optimizations/FlowableCharSequence.java
new file mode 100644
index 0000000000..a45dbdd2c5
--- /dev/null
+++ b/benchmarks/src/jmh/java/benchmarks/flow/scrabble/optimizations/FlowableCharSequence.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package benchmarks.flow.scrabble.optimizations;
+
+import io.reactivex.Flowable;
+import io.reactivex.internal.fuseable.QueueFuseable;
+import io.reactivex.internal.subscriptions.BasicQueueSubscription;
+import io.reactivex.internal.subscriptions.SubscriptionHelper;
+import io.reactivex.internal.util.BackpressureHelper;
+import org.reactivestreams.Subscriber;
+
+final class FlowableCharSequence extends Flowable {
+
+ final CharSequence string;
+
+ FlowableCharSequence(CharSequence string) {
+ this.string = string;
+ }
+
+ @Override
+ public void subscribeActual(Subscriber super Integer> s) {
+ s.onSubscribe(new CharSequenceSubscription(s, string));
+ }
+
+ static final class CharSequenceSubscription
+ extends BasicQueueSubscription {
+
+ private static final long serialVersionUID = -4593793201463047197L;
+
+ final Subscriber super Integer> downstream;
+
+ final CharSequence string;
+
+ final int end;
+
+ int index;
+
+ volatile boolean cancelled;
+
+ CharSequenceSubscription(Subscriber super Integer> downstream, CharSequence string) {
+ this.downstream = downstream;
+ this.string = string;
+ this.end = string.length();
+ }
+
+ @Override
+ public void cancel() {
+ cancelled = true;
+ }
+
+ @Override
+ public void request(long n) {
+ if (SubscriptionHelper.validate(n)) {
+ if (BackpressureHelper.add(this, n) == 0) {
+ if (n == Long.MAX_VALUE) {
+ fastPath();
+ } else {
+ slowPath(n);
+ }
+ }
+ }
+ }
+
+ void fastPath() {
+ int e = end;
+ CharSequence s = string;
+ Subscriber super Integer> a = downstream;
+
+ for (int i = index; i != e; i++) {
+ if (cancelled) {
+ return;
+ }
+
+ a.onNext((int)s.charAt(i));
+ }
+
+ if (!cancelled) {
+ a.onComplete();
+ }
+ }
+
+ void slowPath(long r) {
+ long e = 0L;
+ int i = index;
+ int f = end;
+ CharSequence s = string;
+ Subscriber super Integer> a = downstream;
+
+ for (;;) {
+
+ while (e != r && i != f) {
+ if (cancelled) {
+ return;
+ }
+
+ a.onNext((int)s.charAt(i));
+
+ i++;
+ e++;
+ }
+
+ if (i == f) {
+ if (!cancelled) {
+ a.onComplete();
+ }
+ return;
+ }
+
+ r = get();
+ if (e == r) {
+ index = i;
+ r = addAndGet(-e);
+ if (r == 0L) {
+ break;
+ }
+ e = 0L;
+ }
+ }
+ }
+
+ @Override
+ public int requestFusion(int requestedMode) {
+ return requestedMode & QueueFuseable.SYNC;
+ }
+
+ @Override
+ public Integer poll() {
+ int i = index;
+ if (i != end) {
+ index = i + 1;
+ return (int)string.charAt(i);
+ }
+ return null;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return index == end;
+ }
+
+ @Override
+ public void clear() {
+ index = end;
+ }
+ }
+
+}
diff --git a/benchmarks/src/jmh/java/benchmarks/flow/scrabble/optimizations/FlowableSplit.java b/benchmarks/src/jmh/java/benchmarks/flow/scrabble/optimizations/FlowableSplit.java
new file mode 100644
index 0000000000..83c203e42f
--- /dev/null
+++ b/benchmarks/src/jmh/java/benchmarks/flow/scrabble/optimizations/FlowableSplit.java
@@ -0,0 +1,327 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package benchmarks.flow.scrabble.optimizations;
+
+import io.reactivex.Flowable;
+import io.reactivex.FlowableTransformer;
+import io.reactivex.exceptions.Exceptions;
+import io.reactivex.internal.fuseable.ConditionalSubscriber;
+import io.reactivex.internal.fuseable.SimplePlainQueue;
+import io.reactivex.internal.queue.SpscArrayQueue;
+import io.reactivex.internal.subscriptions.SubscriptionHelper;
+import io.reactivex.internal.util.BackpressureHelper;
+import io.reactivex.plugins.RxJavaPlugins;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+final class FlowableSplit extends Flowable implements FlowableTransformer {
+
+ final Publisher source;
+
+ final Pattern pattern;
+
+ final int bufferSize;
+
+ FlowableSplit(Publisher source, Pattern pattern, int bufferSize) {
+ this.source = source;
+ this.pattern = pattern;
+ this.bufferSize = bufferSize;
+ }
+
+ @Override
+ public Publisher apply(Flowable upstream) {
+ return new FlowableSplit(upstream, pattern, bufferSize);
+ }
+
+ @Override
+ protected void subscribeActual(Subscriber super String> s) {
+ source.subscribe(new SplitSubscriber(s, pattern, bufferSize));
+ }
+
+ static final class SplitSubscriber
+ extends AtomicInteger
+ implements ConditionalSubscriber, Subscription {
+
+ static final String[] EMPTY = new String[0];
+
+ private static final long serialVersionUID = -5022617259701794064L;
+
+ final Subscriber super String> downstream;
+
+ final Pattern pattern;
+
+ final SimplePlainQueue queue;
+
+ final AtomicLong requested;
+
+ final int bufferSize;
+
+ final int limit;
+
+ Subscription upstream;
+
+ volatile boolean cancelled;
+
+ String leftOver;
+
+ String[] current;
+
+ int index;
+
+ int produced;
+
+ volatile boolean done;
+ Throwable error;
+
+ int empty;
+
+ SplitSubscriber(Subscriber super String> downstream, Pattern pattern, int bufferSize) {
+ this.downstream = downstream;
+ this.pattern = pattern;
+ this.bufferSize = bufferSize;
+ this.limit = bufferSize - (bufferSize >> 2);
+ this.queue = new SpscArrayQueue(bufferSize);
+ this.requested = new AtomicLong();
+ }
+
+ @Override
+ public void request(long n) {
+ if (SubscriptionHelper.validate(n)) {
+ BackpressureHelper.add(requested, n);
+ drain();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ cancelled = true;
+ upstream.cancel();
+
+ if (getAndIncrement() == 0) {
+ current = null;
+ queue.clear();
+ }
+ }
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ if (SubscriptionHelper.validate(this.upstream, s)) {
+ this.upstream = s;
+
+ downstream.onSubscribe(this);
+
+ s.request(bufferSize);
+ }
+ }
+
+ @Override
+ public void onNext(String t) {
+ if (!tryOnNext(t)) {
+ upstream.request(1);
+ }
+ }
+
+ @Override
+ public boolean tryOnNext(String t) {
+ String lo = leftOver;
+ String[] a;
+ try {
+ if (lo == null || lo.isEmpty()) {
+ a = pattern.split(t, -1);
+ } else {
+ a = pattern.split(lo + t, -1);
+ }
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ this.upstream.cancel();
+ onError(ex);
+ return true;
+ }
+
+ if (a.length == 0) {
+ leftOver = null;
+ return false;
+ } else
+ if (a.length == 1) {
+ leftOver = a[0];
+ return false;
+ }
+ leftOver = a[a.length - 1];
+ queue.offer(a);
+ drain();
+ return true;
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ if (done) {
+ RxJavaPlugins.onError(t);
+ return;
+ }
+ String lo = leftOver;
+ if (lo != null && !lo.isEmpty()) {
+ leftOver = null;
+ queue.offer(new String[] { lo, null });
+ }
+ error = t;
+ done = true;
+ drain();
+ }
+
+ @Override
+ public void onComplete() {
+ if (!done) {
+ done = true;
+ String lo = leftOver;
+ if (lo != null && !lo.isEmpty()) {
+ leftOver = null;
+ queue.offer(new String[] { lo, null });
+ }
+ drain();
+ }
+ }
+
+ void drain() {
+ if (getAndIncrement() != 0) {
+ return;
+ }
+
+ SimplePlainQueue q = queue;
+
+ int missed = 1;
+ int consumed = produced;
+ String[] array = current;
+ int idx = index;
+ int emptyCount = empty;
+
+ Subscriber super String> a = downstream;
+
+ for (;;) {
+ long r = requested.get();
+ long e = 0;
+
+ while (e != r) {
+ if (cancelled) {
+ current = null;
+ q.clear();
+ return;
+ }
+
+ boolean d = done;
+
+ if (array == null) {
+ array = q.poll();
+ if (array != null) {
+ current = array;
+ if (++consumed == limit) {
+ consumed = 0;
+ upstream.request(limit);
+ }
+ }
+ }
+
+ boolean empty = array == null;
+
+ if (d && empty) {
+ current = null;
+ Throwable ex = error;
+ if (ex != null) {
+ a.onError(ex);
+ } else {
+ a.onComplete();
+ }
+ return;
+ }
+
+ if (empty) {
+ break;
+ }
+
+ if (array.length == idx + 1) {
+ array = null;
+ current = null;
+ idx = 0;
+ continue;
+ }
+
+ String v = array[idx];
+
+ if (v.isEmpty()) {
+ emptyCount++;
+ idx++;
+ } else {
+ while (emptyCount != 0 && e != r) {
+ if (cancelled) {
+ current = null;
+ q.clear();
+ return;
+ }
+ a.onNext("");
+ e++;
+ emptyCount--;
+ }
+
+ if (e != r && emptyCount == 0) {
+ a.onNext(v);
+
+ e++;
+ idx++;
+ }
+ }
+ }
+
+ if (e == r) {
+ if (cancelled) {
+ current = null;
+ q.clear();
+ return;
+ }
+
+ boolean d = done;
+
+ if (array == null) {
+ array = q.poll();
+ if (array != null) {
+ current = array;
+ if (++consumed == limit) {
+ consumed = 0;
+ upstream.request(limit);
+ }
+ }
+ }
+
+ boolean empty = array == null;
+
+ if (d && empty) {
+ current = null;
+ Throwable ex = error;
+ if (ex != null) {
+ a.onError(ex);
+ } else {
+ a.onComplete();
+ }
+ return;
+ }
+ }
+
+ if (e != 0L) {
+ BackpressureHelper.produced(requested, e);
+ }
+
+ empty = emptyCount;
+ produced = consumed;
+ missed = addAndGet(-missed);
+ if (missed == 0) {
+ break;
+ }
+ }
+ }
+ }
+}
diff --git a/benchmarks/src/jmh/java/benchmarks/flow/scrabble/optimizations/StringFlowable.java b/benchmarks/src/jmh/java/benchmarks/flow/scrabble/optimizations/StringFlowable.java
new file mode 100644
index 0000000000..3d36a0d8e7
--- /dev/null
+++ b/benchmarks/src/jmh/java/benchmarks/flow/scrabble/optimizations/StringFlowable.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package benchmarks.flow.scrabble.optimizations;
+
+import io.reactivex.Flowable;
+import io.reactivex.FlowableTransformer;
+import io.reactivex.internal.functions.ObjectHelper;
+import io.reactivex.plugins.RxJavaPlugins;
+
+import java.util.regex.Pattern;
+
+public final class StringFlowable {
+ /** Utility class. */
+ private StringFlowable() {
+ throw new IllegalStateException("No instances!");
+ }
+
+ /**
+ * Signals each character of the given string CharSequence as Integers.
+ * @param string the source of characters
+ * @return the new Flowable instance
+ */
+ public static Flowable characters(CharSequence string) {
+ ObjectHelper.requireNonNull(string, "string is null");
+ return RxJavaPlugins.onAssembly(new FlowableCharSequence(string));
+ }
+
+ /**
+ * Splits the input sequence of strings based on a pattern even across subsequent
+ * elements if needed.
+ * @param pattern the Rexexp pattern to split along
+ * @return the new FlowableTransformer instance
+ *
+ * @since 0.13.0
+ */
+ public static FlowableTransformer split(Pattern pattern) {
+ return split(pattern, Flowable.bufferSize());
+ }
+
+ /**
+ * Splits the input sequence of strings based on a pattern even across subsequent
+ * elements if needed.
+ * @param pattern the Rexexp pattern to split along
+ * @param bufferSize the number of items to prefetch from the upstream
+ * @return the new FlowableTransformer instance
+ *
+ * @since 0.13.0
+ */
+ public static FlowableTransformer split(Pattern pattern, int bufferSize) {
+ ObjectHelper.requireNonNull(pattern, "pattern is null");
+ ObjectHelper.verifyPositive(bufferSize, "bufferSize");
+ return new FlowableSplit(null, pattern, bufferSize);
+ }
+
+ /**
+ * Splits the input sequence of strings based on a pattern even across subsequent
+ * elements if needed.
+ * @param pattern the Rexexp pattern to split along
+ * @return the new FlowableTransformer instance
+ *
+ * @since 0.13.0
+ */
+ public static FlowableTransformer split(String pattern) {
+ return split(pattern, Flowable.bufferSize());
+ }
+
+ /**
+ * Splits the input sequence of strings based on a pattern even across subsequent
+ * elements if needed.
+ * @param pattern the Rexexp pattern to split along
+ * @param bufferSize the number of items to prefetch from the upstream
+ * @return the new FlowableTransformer instance
+ *
+ * @since 0.13.0
+ */
+ public static FlowableTransformer split(String pattern, int bufferSize) {
+ return split(Pattern.compile(pattern), bufferSize);
+ }
+
+}
diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/misc/Numbers.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/misc/Numbers.kt
new file mode 100644
index 0000000000..1eb6fa4c6f
--- /dev/null
+++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/misc/Numbers.kt
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+
+package benchmarks.flow.misc
+
+import benchmarks.flow.scrabble.flow
+import io.reactivex.*
+import io.reactivex.functions.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.openjdk.jmh.annotations.*
+import java.util.concurrent.*
+
+/*
+ * Results:
+ *
+ * // Throw FlowAborted overhead
+ * Numbers.primes avgt 7 3039.185 ± 25.598 us/op
+ * Numbers.primesRx avgt 7 2677.937 ± 17.720 us/op
+ *
+ * // On par
+ * Numbers.transformations avgt 7 16.207 ± 0.133 us/op
+ * Numbers.transformationsRx avgt 7 19.626 ± 0.135 us/op
+ *
+ * // Channels overhead
+ * Numbers.zip avgt 7 434.160 ± 7.014 us/op
+ * Numbers.zipRx avgt 7 87.898 ± 5.007 us/op
+ *
+ */
+@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@State(Scope.Benchmark)
+open class Numbers {
+
+ companion object {
+ private const val primes = 100
+ private const val natural = 1000
+ }
+
+ private fun numbers() = flow {
+ for (i in 2L..Long.MAX_VALUE) emit(i)
+ }
+
+ private fun primesFlow(): Flow = flow {
+ var source = numbers()
+ while (true) {
+ val next = source.take(1).single()
+ emit(next)
+ source = source.filter { it % next != 0L }
+ }
+ }
+
+ private fun rxNumbers() =
+ Flowable.generate(Callable { 1L }, BiFunction, Long> { state, emitter ->
+ val newState = state + 1
+ emitter.onNext(newState)
+ newState
+ })
+
+ private fun generateRxPrimes(): Flowable = Flowable.generate(Callable { rxNumbers() },
+ BiFunction, Emitter, Flowable> { state, emitter ->
+ // Not the most fair comparison, but here we go
+ val prime = state.firstElement().blockingGet()
+ emitter.onNext(prime)
+ state.filter { it % prime != 0L }
+ })
+
+ @Benchmark
+ fun primes() = runBlocking {
+ primesFlow().take(primes).count()
+ }
+
+ @Benchmark
+ fun primesRx() = generateRxPrimes().take(primes.toLong()).count().blockingGet()
+
+ @Benchmark
+ fun zip() = runBlocking {
+ val numbers = numbers().take(natural)
+ val first = numbers
+ .filter { it % 2L != 0L }
+ .map { it * it }
+ val second = numbers
+ .filter { it % 2L == 0L }
+ .map { it * it }
+ first.zip(second) { v1, v2 -> v1 + v2 }.filter { it % 3 == 0L }.count()
+ }
+
+ @Benchmark
+ fun zipRx() {
+ val numbers = rxNumbers().take(natural.toLong())
+ val first = numbers
+ .filter { it % 2L != 0L }
+ .map { it * it }
+ val second = numbers
+ .filter { it % 2L == 0L }
+ .map { it * it }
+ first.zipWith(second, BiFunction { v1, v2 -> v1 + v2 }).filter { it % 3 == 0L }.count()
+ .blockingGet()
+ }
+
+ @Benchmark
+ fun transformations(): Int = runBlocking {
+ numbers()
+ .take(natural)
+ .filter { it % 2L != 0L }
+ .map { it * it }
+ .filter { (it + 1) % 3 == 0L }.count()
+ }
+
+ @Benchmark
+ fun transformationsRx(): Long {
+ return rxNumbers().take(natural.toLong())
+ .filter { it % 2L != 0L }
+ .map { it * it }
+ .filter { (it + 1) % 3 == 0L }.count()
+ .blockingGet()
+ }
+}
diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/misc/SafeFlowBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/misc/SafeFlowBenchmark.kt
new file mode 100644
index 0000000000..f62af91eb8
--- /dev/null
+++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/misc/SafeFlowBenchmark.kt
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package benchmarks.flow.misc
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.openjdk.jmh.annotations.*
+import java.util.concurrent.*
+import benchmarks.flow.scrabble.flow as unsafeFlow
+import kotlinx.coroutines.flow.flow as safeFlow
+
+@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@State(Scope.Benchmark)
+open class SafeFlowBenchmark {
+
+ private fun numbersSafe() = safeFlow {
+ for (i in 2L..1000L) emit(i)
+ }
+
+ private fun numbersUnsafe() = unsafeFlow {
+ for (i in 2L..1000L) emit(i)
+ }
+
+ @Benchmark
+ fun safeNumbers(): Int = runBlocking {
+ numbersSafe().count()
+ }
+
+ @Benchmark
+ fun unsafeNumbers(): Int = runBlocking {
+ numbersUnsafe().count()
+ }
+}
diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleBase.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleBase.kt
new file mode 100644
index 0000000000..b556053b5d
--- /dev/null
+++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleBase.kt
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package benchmarks.flow.scrabble
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.openjdk.jmh.annotations.*
+import java.lang.Long.*
+import java.lang.Long.max
+import java.util.*
+import java.util.concurrent.*
+import kotlin.math.*
+
+@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+open class FlowPlaysScrabbleBase : ShakespearePlaysScrabble() {
+
+ @Benchmark
+ public override fun play(): List>> {
+ val scoreOfALetter = { letter: Int -> flowOf(letterScores[letter - 'a'.toInt()]) }
+
+ val letterScore = { entry: Map.Entry ->
+ flowOf(
+ letterScores[entry.key - 'a'.toInt()] * Integer.min(
+ entry.value.get().toInt(),
+ scrabbleAvailableLetters[entry.key - 'a'.toInt()]
+ )
+ )
+ }
+
+ val toIntegerStream = { string: String ->
+ IterableSpliterator.of(string.chars().boxed().spliterator()).asFlow()
+ }
+
+ val histoOfLetters = { word: String ->
+ flow {
+ emit(toIntegerStream(word).fold(HashMap()) { accumulator, value ->
+ var newValue: LongWrapper? = accumulator[value]
+ if (newValue == null) {
+ newValue = LongWrapper.zero()
+ }
+ accumulator[value] = newValue.incAndSet()
+ accumulator
+ })
+ }
+ }
+
+ val blank = { entry: Map.Entry ->
+ flowOf(max(0L, entry.value.get() - scrabbleAvailableLetters[entry.key - 'a'.toInt()]))
+ }
+
+ val nBlanks = { word: String ->
+ flow {
+ emit(histoOfLetters(word)
+ .flatMapConcat { map -> map.entries.iterator().asFlow() }
+ .flatMapConcat({ blank(it) })
+ .reduce { a, b -> a + b })
+ }
+ }
+
+ val checkBlanks = { word: String ->
+ nBlanks(word).flatMapConcat { l -> flowOf(l <= 2L) }
+ }
+
+ val score2 = { word: String ->
+ flow {
+ emit(histoOfLetters(word)
+ .flatMapConcat { map -> map.entries.iterator().asFlow() }
+ .flatMapConcat { letterScore(it) }
+ .reduce { a, b -> a + b })
+ }
+ }
+
+ val first3 = { word: String ->
+ IterableSpliterator.of(word.chars().boxed().limit(3).spliterator()).asFlow()
+ }
+
+ val last3 = { word: String ->
+ IterableSpliterator.of(word.chars().boxed().skip(3).spliterator()).asFlow()
+ }
+
+ val toBeMaxed = { word: String -> flowOf(first3(word), last3(word)).flattenConcat() }
+
+ // Bonus for double letter
+ val bonusForDoubleLetter = { word: String ->
+ flow {
+ emit(toBeMaxed(word)
+ .flatMapConcat { scoreOfALetter(it) }
+ .reduce { a, b -> max(a, b) })
+ }
+ }
+
+ val score3 = { word: String ->
+ flow {
+ emit(flowOf(
+ score2(word), score2(word),
+ bonusForDoubleLetter(word),
+ bonusForDoubleLetter(word),
+ flowOf(if (word.length == 7) 50 else 0)
+ ).flattenConcat().reduce { a, b -> a + b })
+ }
+ }
+
+ val buildHistoOnScore: (((String) -> Flow) -> Flow>>) = { score ->
+ flow {
+ emit(shakespeareWords.asFlow()
+ .filter({ scrabbleWords.contains(it) && checkBlanks(it).single() })
+ .fold(TreeMap>(Collections.reverseOrder())) { acc, value ->
+ val key = score(value).single()
+ var list = acc[key] as MutableList?
+ if (list == null) {
+ list = ArrayList()
+ acc[key] = list
+ }
+ list.add(value)
+ acc
+ })
+ }
+ }
+
+ return runBlocking {
+ buildHistoOnScore(score3)
+ .flatMapConcat { map -> map.entries.iterator().asFlow() }
+ .take(3)
+ .toList()
+ }
+ }
+}
diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt.kt
new file mode 100644
index 0000000000..921f390dce
--- /dev/null
+++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt.kt
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. and contributors Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package benchmarks.flow.scrabble
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.openjdk.jmh.annotations.*
+import java.lang.Long.max
+import java.util.*
+import java.util.concurrent.*
+import java.util.stream.*
+import kotlin.math.*
+
+@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+open class FlowPlaysScrabbleOpt : ShakespearePlaysScrabble() {
+
+ @Benchmark
+ public override fun play(): List>> {
+ val histoOfLetters = { word: String ->
+ flow {
+ emit(word.asFlow().fold(HashMap()) { accumulator, value ->
+ var newValue: MutableLong? = accumulator[value]
+ if (newValue == null) {
+ newValue = MutableLong()
+ accumulator[value] = newValue
+ }
+ newValue.incAndSet()
+ accumulator
+ })
+ }
+ }
+
+ val blank = { entry: Map.Entry ->
+ max(0L, entry.value.get() - scrabbleAvailableLetters[entry.key - 'a'.toInt()])
+ }
+
+ val nBlanks = { word: String ->
+ flow {
+ emit(histoOfLetters(word)
+ .flatMapConcatIterable { it.entries }
+ .map({ blank(it) })
+ .sum()
+ )
+ }
+ }
+
+ val checkBlanks = { word: String ->
+ nBlanks(word).map { it <= 2L }
+ }
+
+ val letterScore = { entry: Map.Entry ->
+ letterScores[entry.key - 'a'.toInt()] * Integer.min(
+ entry.value.get().toInt(),
+ scrabbleAvailableLetters[entry.key - 'a'.toInt()]
+ )
+ }
+
+ val score2 = { word: String ->
+ flow {
+ emit(histoOfLetters(word)
+ .flatMapConcatIterable { it.entries }
+ .map { letterScore(it) }
+ .sum())
+ }
+ }
+
+ val first3 = { word: String -> word.asFlow(endIndex = 3) }
+ val last3 = { word: String -> word.asFlow(startIndex = 3) }
+ val toBeMaxed = { word: String -> concat(first3(word), last3(word)) }
+
+ val bonusForDoubleLetter = { word: String ->
+ flow {
+ emit(toBeMaxed(word)
+ .map { letterScores[it.toInt() - 'a'.toInt()] }
+ .max())
+ }
+ }
+
+ val score3 = { word: String ->
+ flow {
+ val sum = score2(word).single() + bonusForDoubleLetter(word).single()
+ emit(sum * 2 + if (word.length == 7) 50 else 0)
+ }
+ }
+
+ val buildHistoOnScore: (((String) -> Flow) -> Flow>>) = { score ->
+ flow {
+ emit(shakespeareWords.asFlow()
+ .filter({ scrabbleWords.contains(it) && checkBlanks(it).single() })
+ .fold(TreeMap>(Collections.reverseOrder())) { acc, value ->
+ val key = score(value).single()
+ var list = acc[key] as MutableList?
+ if (list == null) {
+ list = ArrayList()
+ acc[key] = list
+ }
+ list.add(value)
+ acc
+ })
+ }
+ }
+
+ return runBlocking {
+ buildHistoOnScore(score3)
+ .flatMapConcatIterable { it.entries }
+ .take(3)
+ .toList()
+ }
+ }
+}
+
+public fun String.asFlow() = flow {
+ forEach {
+ emit(it.toInt())
+ }
+}
+
+public fun String.asFlow(startIndex: Int = 0, endIndex: Int = length) =
+ StringByCharFlow(this, startIndex, endIndex.coerceAtMost(this.length))
+
+public suspend inline fun Flow.sum(): Int {
+ val collector = object : FlowCollector {
+ public var sum = 0
+
+ override suspend fun emit(value: Int) {
+ sum += value
+ }
+ }
+ collect(collector)
+ return collector.sum
+}
+
+public suspend inline fun Flow.max(): Int {
+ val collector = object : FlowCollector {
+ public var max = 0
+
+ override suspend fun emit(value: Int) {
+ max = max(max, value)
+ }
+ }
+ collect(collector)
+ return collector.max
+}
+
+@JvmName("longSum")
+public suspend inline fun Flow.sum(): Long {
+ val collector = object : FlowCollector {
+ public var sum = 0L
+
+ override suspend fun emit(value: Long) {
+ sum += value
+ }
+ }
+ collect(collector)
+ return collector.sum
+}
+
+public class StringByCharFlow(private val source: String, private val startIndex: Int, private val endIndex: Int): Flow {
+ override suspend fun collect(collector: FlowCollector) {
+ for (i in startIndex until endIndex) collector.emit(source[i])
+ }
+}
+
+public fun concat(first: Flow, second: Flow): Flow = flow {
+ first.collect { value ->
+ return@collect emit(value)
+ }
+
+ second.collect { value ->
+ return@collect emit(value)
+ }
+}
+
+public fun Flow.flatMapConcatIterable(transformer: (T) -> Iterable): Flow = flow {
+ collect { value ->
+ transformer(value).forEach { r ->
+ emit(r)
+ }
+ }
+}
+
+public inline fun flow(@BuilderInference crossinline block: suspend FlowCollector.() -> Unit): Flow {
+ return object : Flow {
+ override suspend fun collect(collector: FlowCollector) {
+ collector.block()
+ }
+ }
+}
diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/IterableSpliterator.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/IterableSpliterator.kt
new file mode 100644
index 0000000000..434ea1e19d
--- /dev/null
+++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/IterableSpliterator.kt
@@ -0,0 +1,12 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package benchmarks.flow.scrabble
+
+import java.util.*
+
+object IterableSpliterator {
+ @JvmStatic
+ public fun of(spliterator: Spliterator): Iterable = Iterable { Spliterators.iterator(spliterator) }
+}
diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/README.md b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/README.md
new file mode 100644
index 0000000000..13e016fd8b
--- /dev/null
+++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/README.md
@@ -0,0 +1,42 @@
+## Reactive scrabble benchmarks
+
+This package contains reactive scrabble benchmarks.
+
+Reactive Scrabble benchmarks were originally developed by José Paumard and are [available](https://github.com/JosePaumard/jdk8-stream-rx-comparison-reloaded) under Apache 2.0,
+Flow version is adaptation of this work.
+All Rx and Reactive benchmarks are based on (or copied from) [David Karnok work](https://github.com/akarnokd/akarnokd-misc).
+
+### Benchmark classes
+
+The package (split into two sourcesets, `kotlin` and `java`), contains different benchmarks with different purposes
+
+ * `RxJava2PlaysScrabble` and `RxJava2PlaysScrabbleOpt` are copied as is and used for comparison. The infrastructure (e.g. `FlowableSplit`)
+ is copied from `akarnokd-misc` in order for the latter benchmark to work.
+ This is the original benchmark for `RxJava`.
+ * `ReactorPlaysScrabble` is an original benchmark for `Reactor`, but rewritten into Kotlin.
+ It is disabled by default and had the only purpose -- verify that Kotlin version performs as the original Java version
+ (which could have been different due to lambdas translation, implicit boxing, etc.). It is disabled because
+ it has almost no difference compared to `RxJava` benchmark.
+ * `FlowPlaysScrabbleBase` is a scrabble benchmark rewritten on top of the `Flow` API without using any optimizations or tricky internals.
+ * `FlowPlaysScrabbleOpt` is an optimized version of benchmark that follows the same guidelines as `RxJava2PlaysScrabbleOpt`: it still is
+ lazy, reactive and uses only `Flow` abstraction.
+ * `SequencePlaysScrabble` is a version of benchmark built on top of `Sequence` without suspensions, used as a lower bound.
+ * `SaneFlowPlaysScrabble` is a `SequencePlaysScrabble` that produces `Flow`.
+ This benchmark is not identical (in terms of functions pipelining) to `FlowPlaysScrabbleOpt`, but rather is used as a lower bound of `Flow` performance
+ on this particular task.
+
+### Results
+
+Benchmark results for throughput mode, Java `1.8.162`.
+Full command: `taskset -c 0,1 java -jar benchmarks.jar -f 2 -jvmArgsPrepend "-XX:+UseParallelGC" .*Scrabble.*`.
+
+```
+FlowPlaysScrabbleBase.play avgt 14 94.845 ± 1.345 ms/op
+FlowPlaysScrabbleOpt.play avgt 14 20.587 ± 0.173 ms/op
+
+RxJava2PlaysScrabble.play avgt 14 114.253 ± 3.450 ms/op
+RxJava2PlaysScrabbleOpt.play avgt 14 30.795 ± 0.144 ms/op
+
+SaneFlowPlaysScrabble.play avgt 14 18.825 ± 0.231 ms/op
+SequencePlaysScrabble.play avgt 14 13.787 ± 0.111 ms/op
+```
diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/ReactorPlaysScrabble.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/ReactorPlaysScrabble.kt
new file mode 100644
index 0000000000..9adc4f1f59
--- /dev/null
+++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/ReactorPlaysScrabble.kt
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package benchmarks.flow.scrabble
+
+import org.openjdk.jmh.annotations.*
+import reactor.core.publisher.*
+import java.lang.Long.*
+import java.util.*
+import java.util.concurrent.*
+import java.util.function.Function
+
+/*@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)*/
+open class ReactorPlaysScrabble : ShakespearePlaysScrabble() {
+
+// @Benchmark
+ public override fun play(): List>> {
+ val scoreOfALetter = Function> { letter -> Flux.just(letterScores[letter - 'a'.toInt()]) }
+
+ val letterScore = Function, Flux> { entry ->
+ Flux.just(
+ letterScores[entry.key - 'a'.toInt()] * Integer.min(
+ entry.value.get().toInt(),
+ scrabbleAvailableLetters[entry.key - 'a'.toInt()]
+ )
+ )
+ }
+
+ val toIntegerStream = Function> { string ->
+ Flux.fromIterable(IterableSpliterator.of(string.chars().boxed().spliterator()))
+ }
+
+ val histoOfLetters = Function>> { word ->
+ Flux.from(toIntegerStream.apply(word)
+ .collect(
+ { HashMap() },
+ { map: HashMap, value: Int ->
+ var newValue: LongWrapper? = map[value]
+ if (newValue == null) {
+ newValue = LongWrapper.zero()
+ }
+ map[value] = newValue.incAndSet()
+ }
+
+ ))
+ }
+
+ val blank = Function, Flux> { entry ->
+ Flux.just(max(0L, entry.value.get() - scrabbleAvailableLetters[entry.key - 'a'.toInt()]))
+ }
+
+ val nBlanks = Function> { word ->
+ Flux.from(histoOfLetters.apply(word)
+ .flatMap> { map -> Flux.fromIterable>(Iterable { map.entries.iterator() }) }
+ .flatMap(blank)
+ .reduce { a, b -> sum(a, b) })
+ }
+
+ val checkBlanks = Function> { word ->
+ nBlanks.apply(word)
+ .flatMap { l -> Flux.just(l <= 2L) }
+ }
+
+
+ val score2 = Function> { word ->
+ Flux.from(histoOfLetters.apply(word)
+ .flatMap> { map -> Flux.fromIterable>(Iterable { map.entries.iterator() }) }
+ .flatMap(letterScore)
+ .reduce { a, b -> Integer.sum(a, b) })
+
+ }
+
+ val first3 = Function> { word -> Flux.fromIterable(
+ IterableSpliterator.of(
+ word.chars().boxed().limit(3).spliterator()
+ )
+ ) }
+ val last3 = Function> { word -> Flux.fromIterable(
+ IterableSpliterator.of(
+ word.chars().boxed().skip(3).spliterator()
+ )
+ ) }
+
+ val toBeMaxed = Function> { word ->
+ Flux.just(first3.apply(word), last3.apply(word))
+ .flatMap { Stream -> Stream }
+ }
+
+ // Bonus for double letter
+ val bonusForDoubleLetter = Function> { word ->
+ Flux.from(toBeMaxed.apply(word)
+ .flatMap(scoreOfALetter)
+ .reduce { a, b -> Integer.max(a, b) }
+ )
+ }
+
+ val score3 = Function> { word ->
+ Flux.from(Flux.just(
+ score2.apply(word),
+ score2.apply(word),
+ bonusForDoubleLetter.apply(word),
+ bonusForDoubleLetter.apply(word),
+ Flux.just(if (word.length == 7) 50 else 0)
+ )
+ .flatMap { Stream -> Stream }
+ .reduce { a, b -> Integer.sum(a, b) })
+ }
+
+ val buildHistoOnScore = Function>, Flux>>> { score ->
+ Flux.from(Flux.fromIterable(Iterable { shakespeareWords.iterator() })
+ .filter( { scrabbleWords.contains(it) })
+ .filter({ word -> checkBlanks.apply(word).toIterable().iterator().next() })
+ .collect(
+ { TreeMap>(Collections.reverseOrder()) },
+ { map: TreeMap>, word: String ->
+ val key = score.apply(word).toIterable().iterator().next()
+ var list = map[key] as MutableList?
+ if (list == null) {
+ list = ArrayList()
+ map[key] = list
+ }
+ list.add(word)
+ }
+ ))
+ }
+
+ val finalList2 = Flux.from>>>(buildHistoOnScore.apply(score3)
+ .flatMap>> { map -> Flux.fromIterable>>(Iterable { map.entries.iterator() }) }
+ .take(3)
+ .collect>>>(
+ { ArrayList() },
+ { list, entry -> list.add(entry) }
+ )
+ ).toIterable().iterator().next()
+
+ return finalList2
+ }
+
+}
+
diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SaneFlowPlaysScrabble.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SaneFlowPlaysScrabble.kt
new file mode 100644
index 0000000000..597667c1c6
--- /dev/null
+++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SaneFlowPlaysScrabble.kt
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package benchmarks.flow.scrabble
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.openjdk.jmh.annotations.*
+import java.lang.Long.*
+import java.util.*
+import java.util.concurrent.*
+
+@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+open class SaneFlowPlaysScrabble : ShakespearePlaysScrabble() {
+
+ @Benchmark
+ public override fun play(): List>> {
+ val score3: suspend (String) -> Int = { word: String ->
+ val sum = score2(word) + bonusForDoubleLetter(word)
+ sum * 2 + if (word.length == 7) 50 else 0
+ }
+
+ val buildHistoOnScore: ((suspend (String) -> Int) -> Flow>>) = { score ->
+ flow {
+ emit(shakespeareWords.asFlow()
+ .filter({ scrabbleWords.contains(it) && checkBlanks(it) })
+ .fold(TreeMap>(Collections.reverseOrder())) { acc, value ->
+ val key = score(value)
+ var list = acc[key] as MutableList?
+ if (list == null) {
+ list = ArrayList()
+ acc[key] = list
+ }
+ list.add(value)
+ acc
+ })
+ }
+ }
+
+ return runBlocking {
+ buildHistoOnScore(score3)
+ .flatMapConcatIterable { it.entries }
+ .take(3)
+ .toList()
+ }
+ }
+
+ private suspend inline fun score2(word: String): Int {
+ return buildHistogram(word)
+ .map { it.letterScore() }
+ .sum()
+ }
+
+ private suspend inline fun bonusForDoubleLetter(word: String): Int {
+ return toBeMaxed(word)
+ .map { letterScores[it - 'a'.toInt()] }
+ .max()
+ }
+
+ private fun Map.Entry.letterScore(): Int = letterScores[key - 'a'.toInt()] * Integer.min(
+ value.get().toInt(),
+ scrabbleAvailableLetters[key - 'a'.toInt()])
+
+ private fun toBeMaxed(word: String) = concat(word.asSequence(), word.asSequence(endIndex = 3))
+
+ private suspend inline fun checkBlanks(word: String) = numBlanks(word) <= 2L
+
+ private suspend fun numBlanks(word: String): Long {
+ return buildHistogram(word)
+ .map { blanks(it) }
+ .sum()
+ }
+
+ private fun blanks(entry: Map.Entry): Long =
+ max(0L, entry.value.get() - scrabbleAvailableLetters[entry.key - 'a'.toInt()])
+
+ private suspend inline fun buildHistogram(word: String): HashMap {
+ return word.asSequence().fold(HashMap()) { accumulator, value ->
+ var newValue: MutableLong? = accumulator[value]
+ if (newValue == null) {
+ newValue = MutableLong()
+ accumulator[value] = newValue
+ }
+ newValue.incAndSet()
+ accumulator
+ }
+ }
+
+ private fun String.asSequence(startIndex: Int = 0, endIndex: Int = length) = flow {
+ for (i in startIndex until endIndex.coerceAtMost(length)) {
+ emit(get(i).toInt())
+ }
+ }
+}
diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SequencePlaysScrabble.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SequencePlaysScrabble.kt
new file mode 100644
index 0000000000..5f4f4c2d1a
--- /dev/null
+++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SequencePlaysScrabble.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package benchmarks.flow.scrabble
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.openjdk.jmh.annotations.*
+import java.lang.Long.*
+import java.util.*
+import java.util.concurrent.*
+
+@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+open class SequencePlaysScrabble : ShakespearePlaysScrabble() {
+
+ @Benchmark
+ public override fun play(): List>> {
+ val score2: (String) -> Int = { word: String ->
+ buildHistogram(word)
+ .map { it.letterScore() }
+ .sum()
+ }
+
+ val bonusForDoubleLetter: (String) -> Int = { word: String ->
+ toBeMaxed(word)
+ .map { letterScores[it - 'a'.toInt()] }
+ .max()!!
+ }
+
+ val score3: (String) -> Int = { word: String ->
+ val sum = score2(word) + bonusForDoubleLetter(word)
+ sum * 2 + if (word.length == 7) 50 else 0
+ }
+
+ val buildHistoOnScore: (((String) -> Int) -> Flow>>) = { score ->
+ flow {
+ emit(shakespeareWords.asSequence()
+ .filter({ scrabbleWords.contains(it) && checkBlanks(it) })
+ .fold(TreeMap>(Collections.reverseOrder())) { acc, value ->
+ val key = score(value)
+ var list = acc[key] as MutableList?
+ if (list == null) {
+ list = ArrayList()
+ acc[key] = list
+ }
+ list.add(value)
+ acc
+ })
+ }
+ }
+
+ return runBlocking {
+ buildHistoOnScore(score3)
+ .flatMapConcatIterable { it.entries }
+ .take(3)
+ .toList()
+ }
+ }
+
+ private fun Map.Entry.letterScore(): Int = letterScores[key - 'a'.toInt()] * Integer.min(
+ value.get().toInt(),
+ scrabbleAvailableLetters[key - 'a'.toInt()])
+
+ private fun toBeMaxed(word: String) = word.asSequence(startIndex = 3) + word.asSequence(endIndex = 3)
+
+ private fun checkBlanks(word: String) = numBlanks(word) <= 2L
+
+ private fun numBlanks(word: String): Long {
+ return buildHistogram(word)
+ .map { blanks(it) }
+ .sum()
+ }
+
+ private fun blanks(entry: Map.Entry): Long =
+ max(0L, entry.value.get() - scrabbleAvailableLetters[entry.key - 'a'.toInt()])
+
+ private fun buildHistogram(word: String): HashMap {
+ return word.asSequence().fold(HashMap()) { accumulator, value ->
+ var newValue: MutableLong? = accumulator[value]
+ if (newValue == null) {
+ newValue = MutableLong()
+ accumulator[value] = newValue
+ }
+ newValue.incAndSet()
+ accumulator
+ }
+ }
+
+ private fun String.asSequence(startIndex: Int = 0, endIndex: Int = length) = object : Sequence {
+ override fun iterator(): Iterator = object : Iterator {
+ private val _endIndex = endIndex.coerceAtMost(length)
+ private var currentIndex = startIndex
+ override fun hasNext(): Boolean = currentIndex < _endIndex
+ override fun next(): Int = get(currentIndex++).toInt()
+ }
+ }
+}
diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/ShakespearePlaysScrabble.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/ShakespearePlaysScrabble.kt
new file mode 100644
index 0000000000..7eaa3f0a5d
--- /dev/null
+++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/ShakespearePlaysScrabble.kt
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package benchmarks.flow.scrabble
+
+import org.openjdk.jmh.annotations.*
+import java.io.*
+import java.util.stream.*
+import java.util.zip.*
+
+@State(Scope.Benchmark)
+abstract class ShakespearePlaysScrabble {
+ @Throws(Exception::class)
+ abstract fun play(): List>>
+
+ public class MutableLong {
+ var value: Long = 0
+ fun get(): Long {
+ return value
+ }
+
+ fun incAndSet(): MutableLong {
+ value++
+ return this
+ }
+
+ fun add(other: MutableLong): MutableLong {
+ value += other.value
+ return this
+ }
+ }
+
+ public interface LongWrapper {
+ fun get(): Long
+
+ @JvmDefault
+ fun incAndSet(): LongWrapper {
+ return object : LongWrapper {
+ override fun get(): Long = this@LongWrapper.get() + 1L
+ }
+ }
+
+ @JvmDefault
+ fun add(other: LongWrapper): LongWrapper {
+ return object : LongWrapper {
+ override fun get(): Long = this@LongWrapper.get() + other.get()
+ }
+ }
+
+ companion object {
+ fun zero(): LongWrapper {
+ return object : LongWrapper {
+ override fun get(): Long = 0L
+ }
+ }
+ }
+ }
+
+ @JvmField
+ public val letterScores: IntArray = intArrayOf(1, 3, 3, 2, 1, 4, 2, 4, 1, 8, 5, 1, 3, 1, 1, 3, 10, 1, 1, 1, 1, 4, 4, 8, 4, 10)
+
+ @JvmField
+ public val scrabbleAvailableLetters: IntArray =
+ intArrayOf(9, 2, 2, 1, 12, 2, 3, 2, 9, 1, 1, 4, 2, 6, 8, 2, 1, 6, 4, 6, 4, 2, 2, 1, 2, 1)
+
+ @JvmField
+ public val scrabbleWords: Set = readResource("ospd.txt.gz")
+
+ @JvmField
+ public val shakespeareWords: Set = readResource("words.shakespeare.txt.gz")
+
+ private fun readResource(path: String) =
+ BufferedReader(InputStreamReader(GZIPInputStream(this.javaClass.classLoader.getResourceAsStream(path)))).lines()
+ .map { it.toLowerCase() }.collect(Collectors.toSet())
+
+ init {
+ val expected = listOf(120 to listOf("jezebel", "quickly"),
+ 118 to listOf("zephyrs"), 116 to listOf("equinox"))
+ val actual = play().map { it.key to it.value }
+ if (expected != actual) {
+ error("Incorrect benchmark, output: $actual")
+ }
+ }
+}
diff --git a/benchmarks/src/jmh/resources/ospd.txt.gz b/benchmarks/src/jmh/resources/ospd.txt.gz
new file mode 100644
index 0000000000..8a3074e927
Binary files /dev/null and b/benchmarks/src/jmh/resources/ospd.txt.gz differ
diff --git a/benchmarks/src/jmh/resources/words.shakespeare.txt.gz b/benchmarks/src/jmh/resources/words.shakespeare.txt.gz
new file mode 100644
index 0000000000..456f56cbeb
Binary files /dev/null and b/benchmarks/src/jmh/resources/words.shakespeare.txt.gz differ
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
index 366906acd6..558b62ed8e 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
@@ -792,7 +792,6 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun asFlow ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
- public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
@@ -814,6 +813,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun flattenMerge (Lkotlinx/coroutines/flow/Flow;II)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flattenMerge$default (Lkotlinx/coroutines/flow/Flow;IIILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun flowOf (Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOf ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flowOn$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
@@ -821,7 +821,6 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static synthetic fun flowViaChannel$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowWith (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flowWith$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
- public static final fun fold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun map (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun mapNotNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
diff --git a/gradle.properties b/gradle.properties
index bc2a75778a..13b510dcef 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,7 +1,7 @@
# Kotlin
version=1.2.1-SNAPSHOT
group=org.jetbrains.kotlinx
-kotlin_version=1.3.30
+kotlin_version=1.3.31
# Dependencies
junit_version=4.12
diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt
index 1416589854..06a5c00e2f 100644
--- a/kotlinx-coroutines-core/common/src/flow/Builders.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt
@@ -58,7 +58,7 @@ public fun flow(@BuilderInference block: suspend FlowCollector.() -> Unit
*/
@FlowPreview
@PublishedApi
-internal fun unsafeFlow(@BuilderInference block: suspend FlowCollector.() -> Unit): Flow {
+internal inline fun unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector.() -> Unit): Flow {
return object : Flow {
override suspend fun collect(collector: FlowCollector) {
collector.block()
@@ -127,6 +127,18 @@ public fun flowOf(vararg elements: T): Flow = unsafeFlow {
}
}
+/**
+ * Creates flow that produces a given [value].
+ */
+@FlowPreview
+public fun flowOf(value: T): Flow = unsafeFlow {
+ /*
+ * Implementation note: this is just an "optimized" overload of flowOf(vararg)
+ * which significantly reduce the footprint of widespread single-value flows.
+ */
+ emit(value)
+}
+
/**
* Returns an empty flow.
*/
@@ -141,7 +153,7 @@ private object EmptyFlow : Flow {
* Creates a flow that produces values from the given array.
*/
@FlowPreview
-public fun Array.asFlow(): Flow = flow {
+public fun Array.asFlow(): Flow = unsafeFlow {
forEach { value ->
emit(value)
}
@@ -151,7 +163,7 @@ public fun Array.asFlow(): Flow = flow {
* Creates flow that produces values from the given array.
*/
@FlowPreview
-public fun IntArray.asFlow(): Flow = flow {
+public fun IntArray.asFlow(): Flow = unsafeFlow {
forEach { value ->
emit(value)
}
@@ -161,7 +173,7 @@ public fun IntArray.asFlow(): Flow = flow {
* Creates flow that produces values from the given array.
*/
@FlowPreview
-public fun LongArray.asFlow(): Flow = flow {
+public fun LongArray.asFlow(): Flow = unsafeFlow {
forEach { value ->
emit(value)
}
@@ -171,7 +183,7 @@ public fun LongArray.asFlow(): Flow = flow {
* Creates flow that produces values from the given range.
*/
@FlowPreview
-public fun IntRange.asFlow(): Flow = flow {
+public fun IntRange.asFlow(): Flow = unsafeFlow {
forEach { value ->
emit(value)
}
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.kt b/kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.common.kt
similarity index 64%
rename from kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.kt
rename to kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.common.kt
index e432c18f8c..6d5a4b4d4d 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.common.kt
@@ -10,7 +10,4 @@ import kotlinx.coroutines.*
* This exception is thrown when operator need no more elements from the flow.
* This exception should never escape outside of operator's implementation.
*/
-internal class AbortFlowException : CancellationException("Flow was aborted, no more elements needed") {
- // TODO expect/actual
- // override fun fillInStackTrace(): Throwable = this
-}
\ No newline at end of file
+internal expect class AbortFlowException() : CancellationException
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
index 3dc021b635..17c1a4c4d6 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
@@ -50,7 +50,7 @@ public fun Flow.flowOn(flowContext: CoroutineContext, bufferSize: Int = 1
coroutineScope {
val channel = produce(flowContext, capacity = bufferSize) {
collect { value ->
- send(value)
+ return@collect send(value)
}
}
channel.consumeEach { value ->
@@ -98,7 +98,7 @@ public fun Flow.flowWith(
val originalContext = coroutineContext.minusKey(Job)
val prepared = source.flowOn(originalContext, bufferSize)
builder(prepared).flowOn(flowContext, bufferSize).collect { value ->
- emit(value)
+ return@collect emit(value)
}
}
}
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt
index 3818efeb16..aff523dd99 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt
@@ -26,10 +26,11 @@ import kotlinx.coroutines.flow.unsafeFlow as flow
* ```
*/
@FlowPreview
-public fun Flow.transform(@BuilderInference transform: suspend FlowCollector.(value: T) -> Unit): Flow {
+public inline fun Flow.transform(@BuilderInference crossinline transform: suspend FlowCollector.(value: T) -> Unit): Flow {
return flow {
collect { value ->
- transform(value)
+ // kludge, without it Unit will be returned and TCE won't kick in, KT-28938
+ return@collect transform(value)
}
}
}
@@ -38,9 +39,9 @@ public fun Flow.transform(@BuilderInference transform: suspend FlowCol
* Returns a flow containing only values of the original flow that matches the given [predicate].
*/
@FlowPreview
-public fun Flow.filter(predicate: suspend (T) -> Boolean): Flow = flow {
+public inline fun Flow.filter(crossinline predicate: suspend (T) -> Boolean): Flow = flow {
collect { value ->
- if (predicate(value)) emit(value)
+ if (predicate(value)) return@collect emit(value)
}
}
@@ -48,9 +49,9 @@ public fun Flow.filter(predicate: suspend (T) -> Boolean): Flow = flow
* Returns a flow containing only values of the original flow that do not match the given [predicate].
*/
@FlowPreview
-public fun Flow.filterNot(predicate: suspend (T) -> Boolean): Flow = flow {
+public inline fun Flow.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow = flow {
collect { value ->
- if (!predicate(value)) emit(value)
+ if (!predicate(value)) return@collect emit(value)
}
}
@@ -66,24 +67,24 @@ public inline fun Flow<*>.filterIsInstance(): Flow = filter { it
*/
@FlowPreview
public fun Flow.filterNotNull(): Flow = flow {
- collect { value -> if (value != null) emit(value) }
+ collect { value -> if (value != null) return@collect emit(value) }
}
/**
* Returns a flow containing the results of applying the given [transform] function to each value of the original flow.
*/
@FlowPreview
-public fun Flow.map(transform: suspend (value: T) -> R): Flow = transform { value ->
- emit(transform(value))
+public inline fun Flow.map(crossinline transform: suspend (value: T) -> R): Flow = transform { value ->
+ return@transform emit(transform(value))
}
/**
* Returns a flow that contains only non-null results of applying the given [transform] function to each value of the original flow.
*/
@FlowPreview
-public fun Flow.mapNotNull(transform: suspend (value: T) -> R?): Flow = transform { value ->
+public inline fun Flow.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow = transform { value ->
val transformed = transform(value) ?: return@transform
- emit(transformed)
+ return@transform emit(transformed)
}
/**
diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
index d0e04ff286..624b51f683 100644
--- a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
+++ b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
@@ -28,7 +28,7 @@ import kotlin.jvm.*
* ```
*/
@FlowPreview
-public suspend fun Flow.collect(action: suspend (value: T) -> Unit): Unit =
+public suspend inline fun Flow.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector {
override suspend fun emit(value: T) = action(value)
})
diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt
index ac3c93cc0d..4afd0959b7 100644
--- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt
+++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt
@@ -38,9 +38,9 @@ public suspend fun Flow.reduce(operation: suspend (accumulator: S,
* Accumulates value starting with [initial] value and applying [operation] current accumulator value and each element
*/
@FlowPreview
-public suspend fun Flow.fold(
+public suspend inline fun Flow.fold(
initial: R,
- operation: suspend (acc: R, value: T) -> R
+ crossinline operation: suspend (acc: R, value: T) -> R
): R {
var accumulator = initial
collect { value ->
diff --git a/kotlinx-coroutines-core/js/src/flow/internal/AbortFlowException.kt b/kotlinx-coroutines-core/js/src/flow/internal/AbortFlowException.kt
new file mode 100644
index 0000000000..d6a9c31eaa
--- /dev/null
+++ b/kotlinx-coroutines-core/js/src/flow/internal/AbortFlowException.kt
@@ -0,0 +1,9 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow.internal
+
+import kotlinx.coroutines.*
+
+internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed")
diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/AbortFlowException.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/AbortFlowException.kt
new file mode 100644
index 0000000000..7ff34e735b
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/src/flow/internal/AbortFlowException.kt
@@ -0,0 +1,11 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow.internal
+
+import kotlinx.coroutines.*
+
+internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed") {
+ override fun fillInStackTrace(): Throwable = this
+}
diff --git a/kotlinx-coroutines-core/native/src/flow/internal/AbortFlowException.kt b/kotlinx-coroutines-core/native/src/flow/internal/AbortFlowException.kt
new file mode 100644
index 0000000000..d6a9c31eaa
--- /dev/null
+++ b/kotlinx-coroutines-core/native/src/flow/internal/AbortFlowException.kt
@@ -0,0 +1,9 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow.internal
+
+import kotlinx.coroutines.*
+
+internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed")
diff --git a/kotlinx-coroutines-debug/test/CoroutinesDumpTest.kt b/kotlinx-coroutines-debug/test/CoroutinesDumpTest.kt
index 35e9d1e9f2..ec727014cb 100644
--- a/kotlinx-coroutines-debug/test/CoroutinesDumpTest.kt
+++ b/kotlinx-coroutines-debug/test/CoroutinesDumpTest.kt
@@ -24,7 +24,6 @@ class CoroutinesDumpTest : DebugTestBase() {
"Coroutine \"coroutine#1\":DeferredCoroutine{Active}@1e4a7dd4, state: SUSPENDED\n" +
"\tat kotlinx.coroutines.debug.CoroutinesDumpTest.sleepingNestedMethod(CoroutinesDumpTest.kt:95)\n" +
"\tat kotlinx.coroutines.debug.CoroutinesDumpTest.sleepingOuterMethod(CoroutinesDumpTest.kt:88)\n" +
- "\tat kotlinx.coroutines.debug.CoroutinesDumpTest\$testSuspendedCoroutine\$1\$deferred\$1.invokeSuspend(CoroutinesDumpTest.kt:29)\n" +
"\t(Coroutine creation stacktrace)\n" +
"\tat kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:116)\n" +
"\tat kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:23)\n" +
@@ -39,13 +38,13 @@ class CoroutinesDumpTest : DebugTestBase() {
fun testRunningCoroutine() = synchronized(monitor) {
val deferred = GlobalScope.async {
activeMethod(shouldSuspend = false)
+ assertTrue(true)
}
awaitCoroutineStarted()
verifyDump(
- "Coroutine \"coroutine#1\":DeferredCoroutine{Active}@1e4a7dd4, state: RUNNING (Last suspension stacktrace, not an actual stacktrace)\n" +
- "\tat kotlinx.coroutines.debug.CoroutinesDumpTest\$testRunningCoroutine\$1\$deferred\$1.invokeSuspend(CoroutinesDumpTest.kt:49)\n" +
- "\t(Coroutine creation stacktrace)\n" +
+ "Coroutine \"coroutine#1\":DeferredCoroutine{Active}@227d9994, state: RUNNING (Last suspension stacktrace, not an actual stacktrace)\n" +
+ "\t(Coroutine creation stacktrace)\n" +
"\tat kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:116)\n" +
"\tat kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:23)\n" +
"\tat kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:99)\n" +
@@ -72,7 +71,6 @@ class CoroutinesDumpTest : DebugTestBase() {
"\tat java.lang.Thread.sleep(Native Method)\n" +
"\tat kotlinx.coroutines.debug.CoroutinesDumpTest.nestedActiveMethod(CoroutinesDumpTest.kt:111)\n" +
"\tat kotlinx.coroutines.debug.CoroutinesDumpTest.activeMethod(CoroutinesDumpTest.kt:106)\n" +
- "\tat kotlinx.coroutines.debug.CoroutinesDumpTest\$testRunningCoroutineWithSuspensionPoint\$1\$deferred\$1.invokeSuspend(CoroutinesDumpTest.kt:71)\n" +
"\t(Coroutine creation stacktrace)\n" +
"\tat kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:116)\n" +
"\tat kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:23)\n" +
diff --git a/kotlinx-coroutines-debug/test/DebugProbesTest.kt b/kotlinx-coroutines-debug/test/DebugProbesTest.kt
index 9dd4d7cec0..35d024178d 100644
--- a/kotlinx-coroutines-debug/test/DebugProbesTest.kt
+++ b/kotlinx-coroutines-debug/test/DebugProbesTest.kt
@@ -45,7 +45,6 @@ class DebugProbesTest : TestBase() {
"\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt)\n" +
"\tat kotlinx.coroutines.debug.DebugProbesTest.oneMoreNestedMethod(DebugProbesTest.kt:71)\n" +
"\tat kotlinx.coroutines.debug.DebugProbesTest.nestedMethod(DebugProbesTest.kt:66)\n" +
- "\tat kotlinx.coroutines.debug.DebugProbesTest\$testAsyncWithProbes\$1\$1.invokeSuspend(DebugProbesTest.kt:43)\n" +
"\t(Coroutine creation stacktrace)\n" +
"\tat kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:116)\n" +
"\tat kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:23)\n" +
@@ -76,7 +75,6 @@ class DebugProbesTest : TestBase() {
"\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt)\n" +
"\tat kotlinx.coroutines.debug.DebugProbesTest.oneMoreNestedMethod(DebugProbesTest.kt:71)\n" +
"\tat kotlinx.coroutines.debug.DebugProbesTest.nestedMethod(DebugProbesTest.kt:66)\n" +
- "\tat kotlinx.coroutines.debug.DebugProbesTest\$testAsyncWithSanitizedProbes\$1\$1.invokeSuspend(DebugProbesTest.kt:43)\n" +
"\t(Coroutine creation stacktrace)\n" +
"\tat kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:116)\n" +
"\tat kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:23)\n" +
diff --git a/kotlinx-coroutines-debug/test/SanitizedProbesTest.kt b/kotlinx-coroutines-debug/test/SanitizedProbesTest.kt
index 3ee80ad38f..c990c3e085 100644
--- a/kotlinx-coroutines-debug/test/SanitizedProbesTest.kt
+++ b/kotlinx-coroutines-debug/test/SanitizedProbesTest.kt
@@ -86,7 +86,6 @@ class SanitizedProbesTest : DebugTestBase() {
"\tat kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:116)",
"Coroutine \"coroutine#2\":StandaloneCoroutine{Active}@1b68b9a4, state: SUSPENDED\n" +
- "\tat definitely.not.kotlinx.coroutines.SanitizedProbesTest\$launchSelector\$1\$1\$1.invokeSuspend(SanitizedProbesTest.kt:105)\n" +
"\tat definitely.not.kotlinx.coroutines.SanitizedProbesTest\$launchSelector\$1.invokeSuspend(SanitizedProbesTest.kt:143)\n" +
"\t(Coroutine creation stacktrace)\n" +
"\tat kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:116)\n" +
diff --git a/ui/kotlinx-coroutines-android/animation-app/gradle.properties b/ui/kotlinx-coroutines-android/animation-app/gradle.properties
index bd2f170459..342b103ab2 100644
--- a/ui/kotlinx-coroutines-android/animation-app/gradle.properties
+++ b/ui/kotlinx-coroutines-android/animation-app/gradle.properties
@@ -18,6 +18,6 @@ org.gradle.jvmargs=-Xmx1536m
kotlin.coroutines=enable
-kotlin_version=1.3.30
+kotlin_version=1.3.31
coroutines_version=1.2.1
diff --git a/ui/kotlinx-coroutines-android/example-app/gradle.properties b/ui/kotlinx-coroutines-android/example-app/gradle.properties
index bd2f170459..342b103ab2 100644
--- a/ui/kotlinx-coroutines-android/example-app/gradle.properties
+++ b/ui/kotlinx-coroutines-android/example-app/gradle.properties
@@ -18,6 +18,6 @@ org.gradle.jvmargs=-Xmx1536m
kotlin.coroutines=enable
-kotlin_version=1.3.30
+kotlin_version=1.3.31
coroutines_version=1.2.1