diff --git a/README.md b/README.md index 2a0f1f3dec..e8e37a8c41 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ [![Download](https://api.bintray.com/packages/kotlin/kotlinx/kotlinx.coroutines/images/download.svg?version=1.2.1) ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.2.1) Library support for Kotlin coroutines with [multiplatform](#multiplatform) support. -This is a companion version for Kotlin `1.3.30` release. +This is a companion version for Kotlin `1.3.31` release. ```kotlin suspend fun main() = coroutineScope { @@ -89,7 +89,7 @@ And make sure that you use the latest Kotlin version: ```xml - 1.3.30 + 1.3.31 ``` @@ -107,7 +107,7 @@ And make sure that you use the latest Kotlin version: ```groovy buildscript { - ext.kotlin_version = '1.3.30' + ext.kotlin_version = '1.3.31' } ``` @@ -133,7 +133,7 @@ And make sure that you use the latest Kotlin version: ```groovy plugins { - kotlin("jvm") version "1.3.30" + kotlin("jvm") version "1.3.31" } ``` diff --git a/benchmarks/build.gradle b/benchmarks/build.gradle index 728804ad4b..157eb88aa6 100644 --- a/benchmarks/build.gradle +++ b/benchmarks/build.gradle @@ -1,6 +1,8 @@ /* * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ +sourceCompatibility = 1.8 +targetCompatibility = 1.8 apply plugin: "net.ltgt.apt" apply plugin: "com.github.johnrengelman.shadow" @@ -10,15 +12,50 @@ repositories { maven { url "https://repo.typesafe.com/typesafe/releases/" } } -jmh.jmhVersion = '1.21' +compileJmhKotlin { + kotlinOptions { + jvmTarget = "1.8" + freeCompilerArgs += ['-Xjvm-default=enable'] + } +} + +/* + * Due to a bug in the inliner it sometimes does not remove inlined symbols (that are later renamed) from unused code paths, + * and it breaks JMH that tries to post-process these symbols and fails because they are renamed. + */ +task removeRedundantFiles(type: Delete) { + delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class" + delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$nBlanks\$1\$\$special\$\$inlined\$map\$1\$1.class" + delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$score2\$1\$\$special\$\$inlined\$map\$1\$1.class" + delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$bonusForDoubleLetter\$1\$\$special\$\$inlined\$map\$1\$1.class" + delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$nBlanks\$1\$\$special\$\$inlined\$map\$1\$2\$1.class" + delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$bonusForDoubleLetter\$1\$\$special\$\$inlined\$map\$1\$2\$1.class" + delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$score2\$1\$\$special\$\$inlined\$map\$1\$2\$1.class" + delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOptKt\$\$special\$\$inlined\$collect\$1\$1.class" + delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOptKt\$\$special\$\$inlined\$collect\$2\$1.class" + delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class" + delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class" + delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class" + delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble//SaneFlowPlaysScrabble\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class" + + // Primes + delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$2\$1.class" + delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$1.class" +} + +jmhRunBytecodeGenerator.dependsOn(removeRedundantFiles) // It is better to use the following to run benchmarks, otherwise you may get unexpected errors: -// ../gradlew --no-daemon cleanJmhJar jmh +// ./gradlew --no-daemon cleanJmhJar jmh -Pjmh="MyBenchmark" jmh { + jmhVersion = '1.21' duplicateClassesStrategy DuplicatesStrategy.INCLUDE failOnError = true resultFormat = 'CSV' -// include = ['.*ChannelProducerConsumer.*'] + if (project.hasProperty('jmh')) { + include = ".*" + project.jmh + ".*" + } +// includeTests = false } jmhJar { @@ -29,8 +66,12 @@ jmhJar { } dependencies { + compile "org.openjdk.jmh:jmh-core:1.21" + compile "io.projectreactor:reactor-core:$reactor_vesion" + compile 'io.reactivex.rxjava2:rxjava:2.1.9' + compile "com.github.akarnokd:rxjava2-extensions:0.20.8" + compile "org.openjdk.jmh:jmh-core:1.21" compile 'com.typesafe.akka:akka-actor_2.12:2.5.0' compile project(':kotlinx-coroutines-core') } - diff --git a/benchmarks/src/jmh/java/benchmarks/flow/scrabble/RxJava2PlaysScrabble.java b/benchmarks/src/jmh/java/benchmarks/flow/scrabble/RxJava2PlaysScrabble.java new file mode 100644 index 0000000000..2a85d0dbdd --- /dev/null +++ b/benchmarks/src/jmh/java/benchmarks/flow/scrabble/RxJava2PlaysScrabble.java @@ -0,0 +1,163 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.flow.scrabble; + +import benchmarks.flow.scrabble.IterableSpliterator; +import benchmarks.flow.scrabble.ShakespearePlaysScrabble; +import io.reactivex.Flowable; +import io.reactivex.Maybe; +import io.reactivex.Single; +import io.reactivex.functions.Function; +import org.openjdk.jmh.annotations.*; + +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +/** + * Shakespeare plays Scrabble with RxJava 2 Flowable. + * @author José + * @author akarnokd + */ +@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +public class RxJava2PlaysScrabble extends ShakespearePlaysScrabble { + + @Benchmark + @Override + public List>> play() throws Exception { + + // Function to compute the score of a given word + Function> scoreOfALetter = letter -> Flowable.just(letterScores[letter - 'a']) ; + + // score of the same letters in a word + Function, Flowable> letterScore = + entry -> + Flowable.just( + letterScores[entry.getKey() - 'a'] * + Integer.min( + (int)entry.getValue().get(), + scrabbleAvailableLetters[entry.getKey() - 'a'] + ) + ) ; + + Function> toIntegerFlowable = + string -> Flowable.fromIterable(IterableSpliterator.of(string.chars().boxed().spliterator())) ; + + // Histogram of the letters in a given word + Function>> histoOfLetters = + word -> toIntegerFlowable.apply(word) + .collect( + () -> new HashMap<>(), + (HashMap map, Integer value) -> + { + LongWrapper newValue = map.get(value) ; + if (newValue == null) { + newValue = () -> 0L ; + } + map.put(value, newValue.incAndSet()) ; + } + + ) ; + + // number of blanks for a given letter + Function, Flowable> blank = + entry -> + Flowable.just( + Long.max( + 0L, + entry.getValue().get() - + scrabbleAvailableLetters[entry.getKey() - 'a'] + ) + ) ; + + // number of blanks for a given word + Function> nBlanks = + word -> histoOfLetters.apply(word) + .flatMapPublisher(map -> Flowable.fromIterable(() -> map.entrySet().iterator())) + .flatMap(blank) + .reduce(Long::sum) ; + + + // can a word be written with 2 blanks? + Function> checkBlanks = + word -> nBlanks.apply(word) + .flatMap(l -> Maybe.just(l <= 2L)) ; + + // score taking blanks into account letterScore1 + Function> score2 = + word -> histoOfLetters.apply(word) + .flatMapPublisher(map -> Flowable.fromIterable(() -> map.entrySet().iterator())) + .flatMap(letterScore) + .reduce(Integer::sum) ; + + // Placing the word on the board + // Building the streams of first and last letters + Function> first3 = + word -> Flowable.fromIterable(IterableSpliterator.of(word.chars().boxed().limit(3).spliterator())) ; + Function> last3 = + word -> Flowable.fromIterable(IterableSpliterator.of(word.chars().boxed().skip(3).spliterator())) ; + + + // Stream to be maxed + Function> toBeMaxed = + word -> Flowable.just(first3.apply(word), last3.apply(word)) + .flatMap(observable -> observable) ; + + // Bonus for double letter + Function> bonusForDoubleLetter = + word -> toBeMaxed.apply(word) + .flatMap(scoreOfALetter) + .reduce(Integer::max) ; + + // score of the word put on the board + Function> score3 = + word -> + Maybe.merge(Arrays.asList( + score2.apply(word), + score2.apply(word), + bonusForDoubleLetter.apply(word), + bonusForDoubleLetter.apply(word), + Maybe.just(word.length() == 7 ? 50 : 0) + ) + ) + .reduce(Integer::sum) ; + + Function>, Single>>> buildHistoOnScore = + score -> Flowable.fromIterable(() -> shakespeareWords.iterator()) + .filter(scrabbleWords::contains) + .filter(word -> checkBlanks.apply(word).blockingGet()) + .collect( + () -> new TreeMap<>(Comparator.reverseOrder()), + (TreeMap> map, String word) -> { + Integer key = score.apply(word).blockingGet() ; + List list = map.get(key) ; + if (list == null) { + list = new ArrayList<>() ; + map.put(key, list) ; + } + list.add(word) ; + } + ) ; + + // best key / value pairs + List>> finalList2 = + buildHistoOnScore.apply(score3) + .flatMapPublisher(map -> Flowable.fromIterable(() -> map.entrySet().iterator())) + .take(3) + .collect( + () -> new ArrayList>>(), + (list, entry) -> { + list.add(entry) ; + } + ) + .blockingGet() ; + return finalList2 ; + } +} \ No newline at end of file diff --git a/benchmarks/src/jmh/java/benchmarks/flow/scrabble/RxJava2PlaysScrabbleOpt.java b/benchmarks/src/jmh/java/benchmarks/flow/scrabble/RxJava2PlaysScrabbleOpt.java new file mode 100644 index 0000000000..7a7cb1aa4e --- /dev/null +++ b/benchmarks/src/jmh/java/benchmarks/flow/scrabble/RxJava2PlaysScrabbleOpt.java @@ -0,0 +1,174 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.flow.scrabble; + +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import hu.akarnokd.rxjava2.math.MathFlowable; +import org.openjdk.jmh.annotations.*; +import benchmarks.flow.scrabble.optimizations.*; +import io.reactivex.*; +import io.reactivex.functions.Function; + +/** + * Shakespeare plays Scrabble with RxJava 2 Flowable optimized. + * @author José + * @author akarnokd + */ +@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +public class RxJava2PlaysScrabbleOpt extends ShakespearePlaysScrabble { + static Flowable chars(String word) { +// return Flowable.range(0, word.length()).map(i -> (int)word.charAt(i)); + return StringFlowable.characters(word); + } + + @Benchmark + @Override + public List>> play() throws Exception { + + // to compute the score of a given word + Function scoreOfALetter = letter -> letterScores[letter - 'a']; + + // score of the same letters in a word + Function, Integer> letterScore = + entry -> + letterScores[entry.getKey() - 'a'] * + Integer.min( + (int)entry.getValue().get(), + scrabbleAvailableLetters[entry.getKey() - 'a'] + ) + ; + + + Function> toIntegerFlowable = + string -> chars(string); + + Map>> histoCache = new HashMap<>(); + // Histogram of the letters in a given word + Function>> histoOfLetters = + word -> { Single> s = histoCache.get(word); + if (s == null) { + s = toIntegerFlowable.apply(word) + .collect( + () -> new HashMap<>(), + (HashMap map, Integer value) -> + { + MutableLong newValue = map.get(value) ; + if (newValue == null) { + newValue = new MutableLong(); + map.put(value, newValue); + } + newValue.incAndSet(); + } + + ); + histoCache.put(word, s); + } + return s; + }; + + // number of blanks for a given letter + Function, Long> blank = + entry -> + Long.max( + 0L, + entry.getValue().get() - + scrabbleAvailableLetters[entry.getKey() - 'a'] + ) + ; + + // number of blanks for a given word + Function> nBlanks = + word -> MathFlowable.sumLong( + histoOfLetters.apply(word).flattenAsFlowable( + map -> map.entrySet() + ) + .map(blank) + ) + ; + + + // can a word be written with 2 blanks? + Function> checkBlanks = + word -> nBlanks.apply(word) + .map(l -> l <= 2L) ; + + // score taking blanks into account letterScore1 + Function> score2 = + word -> MathFlowable.sumInt( + histoOfLetters.apply(word).flattenAsFlowable( + map -> map.entrySet() + ) + .map(letterScore) + ) ; + + // Placing the word on the board + // Building the streams of first and last letters + Function> first3 = + word -> chars(word).take(3) ; + Function> last3 = + word -> chars(word).skip(3) ; + + + // Stream to be maxed + Function> toBeMaxed = + word -> Flowable.concat(first3.apply(word), last3.apply(word)) + ; + + // Bonus for double letter + Function> bonusForDoubleLetter = + word -> MathFlowable.max(toBeMaxed.apply(word) + .map(scoreOfALetter) + ) ; + + // score of the word put on the board + Function> score3 = + word -> + MathFlowable.sumInt(Flowable.concat( + score2.apply(word), + bonusForDoubleLetter.apply(word) + )).map(v -> v * 2 + (word.length() == 7 ? 50 : 0)); + + Function>, Single>>> buildHistoOnScore = + score -> Flowable.fromIterable(shakespeareWords) + .filter(scrabbleWords::contains) + .filter(word -> checkBlanks.apply(word).blockingFirst()) + .collect( + () -> new TreeMap>(Comparator.reverseOrder()), + (TreeMap> map, String word) -> { + Integer key = score.apply(word).blockingFirst() ; + List list = map.get(key) ; + if (list == null) { + list = new ArrayList<>() ; + map.put(key, list) ; + } + list.add(word) ; + } + ) ; + + // best key / value pairs + List>> finalList2 = + buildHistoOnScore.apply(score3).flattenAsFlowable( + map -> map.entrySet() + ) + .take(3) + .collect( + () -> new ArrayList>>(), + (list, entry) -> { + list.add(entry) ; + } + ) + .blockingGet(); + + return finalList2 ; + } +} \ No newline at end of file diff --git a/benchmarks/src/jmh/java/benchmarks/flow/scrabble/optimizations/FlowableCharSequence.java b/benchmarks/src/jmh/java/benchmarks/flow/scrabble/optimizations/FlowableCharSequence.java new file mode 100644 index 0000000000..a45dbdd2c5 --- /dev/null +++ b/benchmarks/src/jmh/java/benchmarks/flow/scrabble/optimizations/FlowableCharSequence.java @@ -0,0 +1,149 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.flow.scrabble.optimizations; + +import io.reactivex.Flowable; +import io.reactivex.internal.fuseable.QueueFuseable; +import io.reactivex.internal.subscriptions.BasicQueueSubscription; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.BackpressureHelper; +import org.reactivestreams.Subscriber; + +final class FlowableCharSequence extends Flowable { + + final CharSequence string; + + FlowableCharSequence(CharSequence string) { + this.string = string; + } + + @Override + public void subscribeActual(Subscriber s) { + s.onSubscribe(new CharSequenceSubscription(s, string)); + } + + static final class CharSequenceSubscription + extends BasicQueueSubscription { + + private static final long serialVersionUID = -4593793201463047197L; + + final Subscriber downstream; + + final CharSequence string; + + final int end; + + int index; + + volatile boolean cancelled; + + CharSequenceSubscription(Subscriber downstream, CharSequence string) { + this.downstream = downstream; + this.string = string; + this.end = string.length(); + } + + @Override + public void cancel() { + cancelled = true; + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + if (BackpressureHelper.add(this, n) == 0) { + if (n == Long.MAX_VALUE) { + fastPath(); + } else { + slowPath(n); + } + } + } + } + + void fastPath() { + int e = end; + CharSequence s = string; + Subscriber a = downstream; + + for (int i = index; i != e; i++) { + if (cancelled) { + return; + } + + a.onNext((int)s.charAt(i)); + } + + if (!cancelled) { + a.onComplete(); + } + } + + void slowPath(long r) { + long e = 0L; + int i = index; + int f = end; + CharSequence s = string; + Subscriber a = downstream; + + for (;;) { + + while (e != r && i != f) { + if (cancelled) { + return; + } + + a.onNext((int)s.charAt(i)); + + i++; + e++; + } + + if (i == f) { + if (!cancelled) { + a.onComplete(); + } + return; + } + + r = get(); + if (e == r) { + index = i; + r = addAndGet(-e); + if (r == 0L) { + break; + } + e = 0L; + } + } + } + + @Override + public int requestFusion(int requestedMode) { + return requestedMode & QueueFuseable.SYNC; + } + + @Override + public Integer poll() { + int i = index; + if (i != end) { + index = i + 1; + return (int)string.charAt(i); + } + return null; + } + + @Override + public boolean isEmpty() { + return index == end; + } + + @Override + public void clear() { + index = end; + } + } + +} diff --git a/benchmarks/src/jmh/java/benchmarks/flow/scrabble/optimizations/FlowableSplit.java b/benchmarks/src/jmh/java/benchmarks/flow/scrabble/optimizations/FlowableSplit.java new file mode 100644 index 0000000000..83c203e42f --- /dev/null +++ b/benchmarks/src/jmh/java/benchmarks/flow/scrabble/optimizations/FlowableSplit.java @@ -0,0 +1,327 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.flow.scrabble.optimizations; + +import io.reactivex.Flowable; +import io.reactivex.FlowableTransformer; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.internal.fuseable.ConditionalSubscriber; +import io.reactivex.internal.fuseable.SimplePlainQueue; +import io.reactivex.internal.queue.SpscArrayQueue; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.BackpressureHelper; +import io.reactivex.plugins.RxJavaPlugins; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; + +final class FlowableSplit extends Flowable implements FlowableTransformer { + + final Publisher source; + + final Pattern pattern; + + final int bufferSize; + + FlowableSplit(Publisher source, Pattern pattern, int bufferSize) { + this.source = source; + this.pattern = pattern; + this.bufferSize = bufferSize; + } + + @Override + public Publisher apply(Flowable upstream) { + return new FlowableSplit(upstream, pattern, bufferSize); + } + + @Override + protected void subscribeActual(Subscriber s) { + source.subscribe(new SplitSubscriber(s, pattern, bufferSize)); + } + + static final class SplitSubscriber + extends AtomicInteger + implements ConditionalSubscriber, Subscription { + + static final String[] EMPTY = new String[0]; + + private static final long serialVersionUID = -5022617259701794064L; + + final Subscriber downstream; + + final Pattern pattern; + + final SimplePlainQueue queue; + + final AtomicLong requested; + + final int bufferSize; + + final int limit; + + Subscription upstream; + + volatile boolean cancelled; + + String leftOver; + + String[] current; + + int index; + + int produced; + + volatile boolean done; + Throwable error; + + int empty; + + SplitSubscriber(Subscriber downstream, Pattern pattern, int bufferSize) { + this.downstream = downstream; + this.pattern = pattern; + this.bufferSize = bufferSize; + this.limit = bufferSize - (bufferSize >> 2); + this.queue = new SpscArrayQueue(bufferSize); + this.requested = new AtomicLong(); + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + BackpressureHelper.add(requested, n); + drain(); + } + } + + @Override + public void cancel() { + cancelled = true; + upstream.cancel(); + + if (getAndIncrement() == 0) { + current = null; + queue.clear(); + } + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.upstream, s)) { + this.upstream = s; + + downstream.onSubscribe(this); + + s.request(bufferSize); + } + } + + @Override + public void onNext(String t) { + if (!tryOnNext(t)) { + upstream.request(1); + } + } + + @Override + public boolean tryOnNext(String t) { + String lo = leftOver; + String[] a; + try { + if (lo == null || lo.isEmpty()) { + a = pattern.split(t, -1); + } else { + a = pattern.split(lo + t, -1); + } + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + this.upstream.cancel(); + onError(ex); + return true; + } + + if (a.length == 0) { + leftOver = null; + return false; + } else + if (a.length == 1) { + leftOver = a[0]; + return false; + } + leftOver = a[a.length - 1]; + queue.offer(a); + drain(); + return true; + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + String lo = leftOver; + if (lo != null && !lo.isEmpty()) { + leftOver = null; + queue.offer(new String[] { lo, null }); + } + error = t; + done = true; + drain(); + } + + @Override + public void onComplete() { + if (!done) { + done = true; + String lo = leftOver; + if (lo != null && !lo.isEmpty()) { + leftOver = null; + queue.offer(new String[] { lo, null }); + } + drain(); + } + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + SimplePlainQueue q = queue; + + int missed = 1; + int consumed = produced; + String[] array = current; + int idx = index; + int emptyCount = empty; + + Subscriber a = downstream; + + for (;;) { + long r = requested.get(); + long e = 0; + + while (e != r) { + if (cancelled) { + current = null; + q.clear(); + return; + } + + boolean d = done; + + if (array == null) { + array = q.poll(); + if (array != null) { + current = array; + if (++consumed == limit) { + consumed = 0; + upstream.request(limit); + } + } + } + + boolean empty = array == null; + + if (d && empty) { + current = null; + Throwable ex = error; + if (ex != null) { + a.onError(ex); + } else { + a.onComplete(); + } + return; + } + + if (empty) { + break; + } + + if (array.length == idx + 1) { + array = null; + current = null; + idx = 0; + continue; + } + + String v = array[idx]; + + if (v.isEmpty()) { + emptyCount++; + idx++; + } else { + while (emptyCount != 0 && e != r) { + if (cancelled) { + current = null; + q.clear(); + return; + } + a.onNext(""); + e++; + emptyCount--; + } + + if (e != r && emptyCount == 0) { + a.onNext(v); + + e++; + idx++; + } + } + } + + if (e == r) { + if (cancelled) { + current = null; + q.clear(); + return; + } + + boolean d = done; + + if (array == null) { + array = q.poll(); + if (array != null) { + current = array; + if (++consumed == limit) { + consumed = 0; + upstream.request(limit); + } + } + } + + boolean empty = array == null; + + if (d && empty) { + current = null; + Throwable ex = error; + if (ex != null) { + a.onError(ex); + } else { + a.onComplete(); + } + return; + } + } + + if (e != 0L) { + BackpressureHelper.produced(requested, e); + } + + empty = emptyCount; + produced = consumed; + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + } +} diff --git a/benchmarks/src/jmh/java/benchmarks/flow/scrabble/optimizations/StringFlowable.java b/benchmarks/src/jmh/java/benchmarks/flow/scrabble/optimizations/StringFlowable.java new file mode 100644 index 0000000000..3d36a0d8e7 --- /dev/null +++ b/benchmarks/src/jmh/java/benchmarks/flow/scrabble/optimizations/StringFlowable.java @@ -0,0 +1,82 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.flow.scrabble.optimizations; + +import io.reactivex.Flowable; +import io.reactivex.FlowableTransformer; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.plugins.RxJavaPlugins; + +import java.util.regex.Pattern; + +public final class StringFlowable { + /** Utility class. */ + private StringFlowable() { + throw new IllegalStateException("No instances!"); + } + + /** + * Signals each character of the given string CharSequence as Integers. + * @param string the source of characters + * @return the new Flowable instance + */ + public static Flowable characters(CharSequence string) { + ObjectHelper.requireNonNull(string, "string is null"); + return RxJavaPlugins.onAssembly(new FlowableCharSequence(string)); + } + + /** + * Splits the input sequence of strings based on a pattern even across subsequent + * elements if needed. + * @param pattern the Rexexp pattern to split along + * @return the new FlowableTransformer instance + * + * @since 0.13.0 + */ + public static FlowableTransformer split(Pattern pattern) { + return split(pattern, Flowable.bufferSize()); + } + + /** + * Splits the input sequence of strings based on a pattern even across subsequent + * elements if needed. + * @param pattern the Rexexp pattern to split along + * @param bufferSize the number of items to prefetch from the upstream + * @return the new FlowableTransformer instance + * + * @since 0.13.0 + */ + public static FlowableTransformer split(Pattern pattern, int bufferSize) { + ObjectHelper.requireNonNull(pattern, "pattern is null"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); + return new FlowableSplit(null, pattern, bufferSize); + } + + /** + * Splits the input sequence of strings based on a pattern even across subsequent + * elements if needed. + * @param pattern the Rexexp pattern to split along + * @return the new FlowableTransformer instance + * + * @since 0.13.0 + */ + public static FlowableTransformer split(String pattern) { + return split(pattern, Flowable.bufferSize()); + } + + /** + * Splits the input sequence of strings based on a pattern even across subsequent + * elements if needed. + * @param pattern the Rexexp pattern to split along + * @param bufferSize the number of items to prefetch from the upstream + * @return the new FlowableTransformer instance + * + * @since 0.13.0 + */ + public static FlowableTransformer split(String pattern, int bufferSize) { + return split(Pattern.compile(pattern), bufferSize); + } + +} diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/misc/Numbers.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/misc/Numbers.kt new file mode 100644 index 0000000000..1eb6fa4c6f --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/misc/Numbers.kt @@ -0,0 +1,123 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + + +package benchmarks.flow.misc + +import benchmarks.flow.scrabble.flow +import io.reactivex.* +import io.reactivex.functions.* +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.openjdk.jmh.annotations.* +import java.util.concurrent.* + +/* + * Results: + * + * // Throw FlowAborted overhead + * Numbers.primes avgt 7 3039.185 ± 25.598 us/op + * Numbers.primesRx avgt 7 2677.937 ± 17.720 us/op + * + * // On par + * Numbers.transformations avgt 7 16.207 ± 0.133 us/op + * Numbers.transformationsRx avgt 7 19.626 ± 0.135 us/op + * + * // Channels overhead + * Numbers.zip avgt 7 434.160 ± 7.014 us/op + * Numbers.zipRx avgt 7 87.898 ± 5.007 us/op + * + */ +@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Benchmark) +open class Numbers { + + companion object { + private const val primes = 100 + private const val natural = 1000 + } + + private fun numbers() = flow { + for (i in 2L..Long.MAX_VALUE) emit(i) + } + + private fun primesFlow(): Flow = flow { + var source = numbers() + while (true) { + val next = source.take(1).single() + emit(next) + source = source.filter { it % next != 0L } + } + } + + private fun rxNumbers() = + Flowable.generate(Callable { 1L }, BiFunction, Long> { state, emitter -> + val newState = state + 1 + emitter.onNext(newState) + newState + }) + + private fun generateRxPrimes(): Flowable = Flowable.generate(Callable { rxNumbers() }, + BiFunction, Emitter, Flowable> { state, emitter -> + // Not the most fair comparison, but here we go + val prime = state.firstElement().blockingGet() + emitter.onNext(prime) + state.filter { it % prime != 0L } + }) + + @Benchmark + fun primes() = runBlocking { + primesFlow().take(primes).count() + } + + @Benchmark + fun primesRx() = generateRxPrimes().take(primes.toLong()).count().blockingGet() + + @Benchmark + fun zip() = runBlocking { + val numbers = numbers().take(natural) + val first = numbers + .filter { it % 2L != 0L } + .map { it * it } + val second = numbers + .filter { it % 2L == 0L } + .map { it * it } + first.zip(second) { v1, v2 -> v1 + v2 }.filter { it % 3 == 0L }.count() + } + + @Benchmark + fun zipRx() { + val numbers = rxNumbers().take(natural.toLong()) + val first = numbers + .filter { it % 2L != 0L } + .map { it * it } + val second = numbers + .filter { it % 2L == 0L } + .map { it * it } + first.zipWith(second, BiFunction { v1, v2 -> v1 + v2 }).filter { it % 3 == 0L }.count() + .blockingGet() + } + + @Benchmark + fun transformations(): Int = runBlocking { + numbers() + .take(natural) + .filter { it % 2L != 0L } + .map { it * it } + .filter { (it + 1) % 3 == 0L }.count() + } + + @Benchmark + fun transformationsRx(): Long { + return rxNumbers().take(natural.toLong()) + .filter { it % 2L != 0L } + .map { it * it } + .filter { (it + 1) % 3 == 0L }.count() + .blockingGet() + } +} diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/misc/SafeFlowBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/misc/SafeFlowBenchmark.kt new file mode 100644 index 0000000000..f62af91eb8 --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/misc/SafeFlowBenchmark.kt @@ -0,0 +1,39 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.flow.misc + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.openjdk.jmh.annotations.* +import java.util.concurrent.* +import benchmarks.flow.scrabble.flow as unsafeFlow +import kotlinx.coroutines.flow.flow as safeFlow + +@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Benchmark) +open class SafeFlowBenchmark { + + private fun numbersSafe() = safeFlow { + for (i in 2L..1000L) emit(i) + } + + private fun numbersUnsafe() = unsafeFlow { + for (i in 2L..1000L) emit(i) + } + + @Benchmark + fun safeNumbers(): Int = runBlocking { + numbersSafe().count() + } + + @Benchmark + fun unsafeNumbers(): Int = runBlocking { + numbersUnsafe().count() + } +} diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleBase.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleBase.kt new file mode 100644 index 0000000000..b556053b5d --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleBase.kt @@ -0,0 +1,134 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.flow.scrabble + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.openjdk.jmh.annotations.* +import java.lang.Long.* +import java.lang.Long.max +import java.util.* +import java.util.concurrent.* +import kotlin.math.* + +@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +open class FlowPlaysScrabbleBase : ShakespearePlaysScrabble() { + + @Benchmark + public override fun play(): List>> { + val scoreOfALetter = { letter: Int -> flowOf(letterScores[letter - 'a'.toInt()]) } + + val letterScore = { entry: Map.Entry -> + flowOf( + letterScores[entry.key - 'a'.toInt()] * Integer.min( + entry.value.get().toInt(), + scrabbleAvailableLetters[entry.key - 'a'.toInt()] + ) + ) + } + + val toIntegerStream = { string: String -> + IterableSpliterator.of(string.chars().boxed().spliterator()).asFlow() + } + + val histoOfLetters = { word: String -> + flow { + emit(toIntegerStream(word).fold(HashMap()) { accumulator, value -> + var newValue: LongWrapper? = accumulator[value] + if (newValue == null) { + newValue = LongWrapper.zero() + } + accumulator[value] = newValue.incAndSet() + accumulator + }) + } + } + + val blank = { entry: Map.Entry -> + flowOf(max(0L, entry.value.get() - scrabbleAvailableLetters[entry.key - 'a'.toInt()])) + } + + val nBlanks = { word: String -> + flow { + emit(histoOfLetters(word) + .flatMapConcat { map -> map.entries.iterator().asFlow() } + .flatMapConcat({ blank(it) }) + .reduce { a, b -> a + b }) + } + } + + val checkBlanks = { word: String -> + nBlanks(word).flatMapConcat { l -> flowOf(l <= 2L) } + } + + val score2 = { word: String -> + flow { + emit(histoOfLetters(word) + .flatMapConcat { map -> map.entries.iterator().asFlow() } + .flatMapConcat { letterScore(it) } + .reduce { a, b -> a + b }) + } + } + + val first3 = { word: String -> + IterableSpliterator.of(word.chars().boxed().limit(3).spliterator()).asFlow() + } + + val last3 = { word: String -> + IterableSpliterator.of(word.chars().boxed().skip(3).spliterator()).asFlow() + } + + val toBeMaxed = { word: String -> flowOf(first3(word), last3(word)).flattenConcat() } + + // Bonus for double letter + val bonusForDoubleLetter = { word: String -> + flow { + emit(toBeMaxed(word) + .flatMapConcat { scoreOfALetter(it) } + .reduce { a, b -> max(a, b) }) + } + } + + val score3 = { word: String -> + flow { + emit(flowOf( + score2(word), score2(word), + bonusForDoubleLetter(word), + bonusForDoubleLetter(word), + flowOf(if (word.length == 7) 50 else 0) + ).flattenConcat().reduce { a, b -> a + b }) + } + } + + val buildHistoOnScore: (((String) -> Flow) -> Flow>>) = { score -> + flow { + emit(shakespeareWords.asFlow() + .filter({ scrabbleWords.contains(it) && checkBlanks(it).single() }) + .fold(TreeMap>(Collections.reverseOrder())) { acc, value -> + val key = score(value).single() + var list = acc[key] as MutableList? + if (list == null) { + list = ArrayList() + acc[key] = list + } + list.add(value) + acc + }) + } + } + + return runBlocking { + buildHistoOnScore(score3) + .flatMapConcat { map -> map.entries.iterator().asFlow() } + .take(3) + .toList() + } + } +} diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt.kt new file mode 100644 index 0000000000..921f390dce --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt.kt @@ -0,0 +1,195 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. and contributors Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.flow.scrabble + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.openjdk.jmh.annotations.* +import java.lang.Long.max +import java.util.* +import java.util.concurrent.* +import java.util.stream.* +import kotlin.math.* + +@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +open class FlowPlaysScrabbleOpt : ShakespearePlaysScrabble() { + + @Benchmark + public override fun play(): List>> { + val histoOfLetters = { word: String -> + flow { + emit(word.asFlow().fold(HashMap()) { accumulator, value -> + var newValue: MutableLong? = accumulator[value] + if (newValue == null) { + newValue = MutableLong() + accumulator[value] = newValue + } + newValue.incAndSet() + accumulator + }) + } + } + + val blank = { entry: Map.Entry -> + max(0L, entry.value.get() - scrabbleAvailableLetters[entry.key - 'a'.toInt()]) + } + + val nBlanks = { word: String -> + flow { + emit(histoOfLetters(word) + .flatMapConcatIterable { it.entries } + .map({ blank(it) }) + .sum() + ) + } + } + + val checkBlanks = { word: String -> + nBlanks(word).map { it <= 2L } + } + + val letterScore = { entry: Map.Entry -> + letterScores[entry.key - 'a'.toInt()] * Integer.min( + entry.value.get().toInt(), + scrabbleAvailableLetters[entry.key - 'a'.toInt()] + ) + } + + val score2 = { word: String -> + flow { + emit(histoOfLetters(word) + .flatMapConcatIterable { it.entries } + .map { letterScore(it) } + .sum()) + } + } + + val first3 = { word: String -> word.asFlow(endIndex = 3) } + val last3 = { word: String -> word.asFlow(startIndex = 3) } + val toBeMaxed = { word: String -> concat(first3(word), last3(word)) } + + val bonusForDoubleLetter = { word: String -> + flow { + emit(toBeMaxed(word) + .map { letterScores[it.toInt() - 'a'.toInt()] } + .max()) + } + } + + val score3 = { word: String -> + flow { + val sum = score2(word).single() + bonusForDoubleLetter(word).single() + emit(sum * 2 + if (word.length == 7) 50 else 0) + } + } + + val buildHistoOnScore: (((String) -> Flow) -> Flow>>) = { score -> + flow { + emit(shakespeareWords.asFlow() + .filter({ scrabbleWords.contains(it) && checkBlanks(it).single() }) + .fold(TreeMap>(Collections.reverseOrder())) { acc, value -> + val key = score(value).single() + var list = acc[key] as MutableList? + if (list == null) { + list = ArrayList() + acc[key] = list + } + list.add(value) + acc + }) + } + } + + return runBlocking { + buildHistoOnScore(score3) + .flatMapConcatIterable { it.entries } + .take(3) + .toList() + } + } +} + +public fun String.asFlow() = flow { + forEach { + emit(it.toInt()) + } +} + +public fun String.asFlow(startIndex: Int = 0, endIndex: Int = length) = + StringByCharFlow(this, startIndex, endIndex.coerceAtMost(this.length)) + +public suspend inline fun Flow.sum(): Int { + val collector = object : FlowCollector { + public var sum = 0 + + override suspend fun emit(value: Int) { + sum += value + } + } + collect(collector) + return collector.sum +} + +public suspend inline fun Flow.max(): Int { + val collector = object : FlowCollector { + public var max = 0 + + override suspend fun emit(value: Int) { + max = max(max, value) + } + } + collect(collector) + return collector.max +} + +@JvmName("longSum") +public suspend inline fun Flow.sum(): Long { + val collector = object : FlowCollector { + public var sum = 0L + + override suspend fun emit(value: Long) { + sum += value + } + } + collect(collector) + return collector.sum +} + +public class StringByCharFlow(private val source: String, private val startIndex: Int, private val endIndex: Int): Flow { + override suspend fun collect(collector: FlowCollector) { + for (i in startIndex until endIndex) collector.emit(source[i]) + } +} + +public fun concat(first: Flow, second: Flow): Flow = flow { + first.collect { value -> + return@collect emit(value) + } + + second.collect { value -> + return@collect emit(value) + } +} + +public fun Flow.flatMapConcatIterable(transformer: (T) -> Iterable): Flow = flow { + collect { value -> + transformer(value).forEach { r -> + emit(r) + } + } +} + +public inline fun flow(@BuilderInference crossinline block: suspend FlowCollector.() -> Unit): Flow { + return object : Flow { + override suspend fun collect(collector: FlowCollector) { + collector.block() + } + } +} diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/IterableSpliterator.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/IterableSpliterator.kt new file mode 100644 index 0000000000..434ea1e19d --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/IterableSpliterator.kt @@ -0,0 +1,12 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.flow.scrabble + +import java.util.* + +object IterableSpliterator { + @JvmStatic + public fun of(spliterator: Spliterator): Iterable = Iterable { Spliterators.iterator(spliterator) } +} diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/README.md b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/README.md new file mode 100644 index 0000000000..13e016fd8b --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/README.md @@ -0,0 +1,42 @@ +## Reactive scrabble benchmarks + +This package contains reactive scrabble benchmarks. + +Reactive Scrabble benchmarks were originally developed by José Paumard and are [available](https://github.com/JosePaumard/jdk8-stream-rx-comparison-reloaded) under Apache 2.0, +Flow version is adaptation of this work. +All Rx and Reactive benchmarks are based on (or copied from) [David Karnok work](https://github.com/akarnokd/akarnokd-misc). + +### Benchmark classes + +The package (split into two sourcesets, `kotlin` and `java`), contains different benchmarks with different purposes + + * `RxJava2PlaysScrabble` and `RxJava2PlaysScrabbleOpt` are copied as is and used for comparison. The infrastructure (e.g. `FlowableSplit`) + is copied from `akarnokd-misc` in order for the latter benchmark to work. + This is the original benchmark for `RxJava`. + * `ReactorPlaysScrabble` is an original benchmark for `Reactor`, but rewritten into Kotlin. + It is disabled by default and had the only purpose -- verify that Kotlin version performs as the original Java version + (which could have been different due to lambdas translation, implicit boxing, etc.). It is disabled because + it has almost no difference compared to `RxJava` benchmark. + * `FlowPlaysScrabbleBase` is a scrabble benchmark rewritten on top of the `Flow` API without using any optimizations or tricky internals. + * `FlowPlaysScrabbleOpt` is an optimized version of benchmark that follows the same guidelines as `RxJava2PlaysScrabbleOpt`: it still is + lazy, reactive and uses only `Flow` abstraction. + * `SequencePlaysScrabble` is a version of benchmark built on top of `Sequence` without suspensions, used as a lower bound. + * `SaneFlowPlaysScrabble` is a `SequencePlaysScrabble` that produces `Flow`. + This benchmark is not identical (in terms of functions pipelining) to `FlowPlaysScrabbleOpt`, but rather is used as a lower bound of `Flow` performance + on this particular task. + +### Results + +Benchmark results for throughput mode, Java `1.8.162`. +Full command: `taskset -c 0,1 java -jar benchmarks.jar -f 2 -jvmArgsPrepend "-XX:+UseParallelGC" .*Scrabble.*`. + +``` +FlowPlaysScrabbleBase.play avgt 14 94.845 ± 1.345 ms/op +FlowPlaysScrabbleOpt.play avgt 14 20.587 ± 0.173 ms/op + +RxJava2PlaysScrabble.play avgt 14 114.253 ± 3.450 ms/op +RxJava2PlaysScrabbleOpt.play avgt 14 30.795 ± 0.144 ms/op + +SaneFlowPlaysScrabble.play avgt 14 18.825 ± 0.231 ms/op +SequencePlaysScrabble.play avgt 14 13.787 ± 0.111 ms/op +``` diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/ReactorPlaysScrabble.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/ReactorPlaysScrabble.kt new file mode 100644 index 0000000000..9adc4f1f59 --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/ReactorPlaysScrabble.kt @@ -0,0 +1,146 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.flow.scrabble + +import org.openjdk.jmh.annotations.* +import reactor.core.publisher.* +import java.lang.Long.* +import java.util.* +import java.util.concurrent.* +import java.util.function.Function + +/*@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark)*/ +open class ReactorPlaysScrabble : ShakespearePlaysScrabble() { + +// @Benchmark + public override fun play(): List>> { + val scoreOfALetter = Function> { letter -> Flux.just(letterScores[letter - 'a'.toInt()]) } + + val letterScore = Function, Flux> { entry -> + Flux.just( + letterScores[entry.key - 'a'.toInt()] * Integer.min( + entry.value.get().toInt(), + scrabbleAvailableLetters[entry.key - 'a'.toInt()] + ) + ) + } + + val toIntegerStream = Function> { string -> + Flux.fromIterable(IterableSpliterator.of(string.chars().boxed().spliterator())) + } + + val histoOfLetters = Function>> { word -> + Flux.from(toIntegerStream.apply(word) + .collect( + { HashMap() }, + { map: HashMap, value: Int -> + var newValue: LongWrapper? = map[value] + if (newValue == null) { + newValue = LongWrapper.zero() + } + map[value] = newValue.incAndSet() + } + + )) + } + + val blank = Function, Flux> { entry -> + Flux.just(max(0L, entry.value.get() - scrabbleAvailableLetters[entry.key - 'a'.toInt()])) + } + + val nBlanks = Function> { word -> + Flux.from(histoOfLetters.apply(word) + .flatMap> { map -> Flux.fromIterable>(Iterable { map.entries.iterator() }) } + .flatMap(blank) + .reduce { a, b -> sum(a, b) }) + } + + val checkBlanks = Function> { word -> + nBlanks.apply(word) + .flatMap { l -> Flux.just(l <= 2L) } + } + + + val score2 = Function> { word -> + Flux.from(histoOfLetters.apply(word) + .flatMap> { map -> Flux.fromIterable>(Iterable { map.entries.iterator() }) } + .flatMap(letterScore) + .reduce { a, b -> Integer.sum(a, b) }) + + } + + val first3 = Function> { word -> Flux.fromIterable( + IterableSpliterator.of( + word.chars().boxed().limit(3).spliterator() + ) + ) } + val last3 = Function> { word -> Flux.fromIterable( + IterableSpliterator.of( + word.chars().boxed().skip(3).spliterator() + ) + ) } + + val toBeMaxed = Function> { word -> + Flux.just(first3.apply(word), last3.apply(word)) + .flatMap { Stream -> Stream } + } + + // Bonus for double letter + val bonusForDoubleLetter = Function> { word -> + Flux.from(toBeMaxed.apply(word) + .flatMap(scoreOfALetter) + .reduce { a, b -> Integer.max(a, b) } + ) + } + + val score3 = Function> { word -> + Flux.from(Flux.just( + score2.apply(word), + score2.apply(word), + bonusForDoubleLetter.apply(word), + bonusForDoubleLetter.apply(word), + Flux.just(if (word.length == 7) 50 else 0) + ) + .flatMap { Stream -> Stream } + .reduce { a, b -> Integer.sum(a, b) }) + } + + val buildHistoOnScore = Function>, Flux>>> { score -> + Flux.from(Flux.fromIterable(Iterable { shakespeareWords.iterator() }) + .filter( { scrabbleWords.contains(it) }) + .filter({ word -> checkBlanks.apply(word).toIterable().iterator().next() }) + .collect( + { TreeMap>(Collections.reverseOrder()) }, + { map: TreeMap>, word: String -> + val key = score.apply(word).toIterable().iterator().next() + var list = map[key] as MutableList? + if (list == null) { + list = ArrayList() + map[key] = list + } + list.add(word) + } + )) + } + + val finalList2 = Flux.from>>>(buildHistoOnScore.apply(score3) + .flatMap>> { map -> Flux.fromIterable>>(Iterable { map.entries.iterator() }) } + .take(3) + .collect>>>( + { ArrayList() }, + { list, entry -> list.add(entry) } + ) + ).toIterable().iterator().next() + + return finalList2 + } + +} + diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SaneFlowPlaysScrabble.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SaneFlowPlaysScrabble.kt new file mode 100644 index 0000000000..597667c1c6 --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SaneFlowPlaysScrabble.kt @@ -0,0 +1,104 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.flow.scrabble + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.openjdk.jmh.annotations.* +import java.lang.Long.* +import java.util.* +import java.util.concurrent.* + +@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +open class SaneFlowPlaysScrabble : ShakespearePlaysScrabble() { + + @Benchmark + public override fun play(): List>> { + val score3: suspend (String) -> Int = { word: String -> + val sum = score2(word) + bonusForDoubleLetter(word) + sum * 2 + if (word.length == 7) 50 else 0 + } + + val buildHistoOnScore: ((suspend (String) -> Int) -> Flow>>) = { score -> + flow { + emit(shakespeareWords.asFlow() + .filter({ scrabbleWords.contains(it) && checkBlanks(it) }) + .fold(TreeMap>(Collections.reverseOrder())) { acc, value -> + val key = score(value) + var list = acc[key] as MutableList? + if (list == null) { + list = ArrayList() + acc[key] = list + } + list.add(value) + acc + }) + } + } + + return runBlocking { + buildHistoOnScore(score3) + .flatMapConcatIterable { it.entries } + .take(3) + .toList() + } + } + + private suspend inline fun score2(word: String): Int { + return buildHistogram(word) + .map { it.letterScore() } + .sum() + } + + private suspend inline fun bonusForDoubleLetter(word: String): Int { + return toBeMaxed(word) + .map { letterScores[it - 'a'.toInt()] } + .max() + } + + private fun Map.Entry.letterScore(): Int = letterScores[key - 'a'.toInt()] * Integer.min( + value.get().toInt(), + scrabbleAvailableLetters[key - 'a'.toInt()]) + + private fun toBeMaxed(word: String) = concat(word.asSequence(), word.asSequence(endIndex = 3)) + + private suspend inline fun checkBlanks(word: String) = numBlanks(word) <= 2L + + private suspend fun numBlanks(word: String): Long { + return buildHistogram(word) + .map { blanks(it) } + .sum() + } + + private fun blanks(entry: Map.Entry): Long = + max(0L, entry.value.get() - scrabbleAvailableLetters[entry.key - 'a'.toInt()]) + + private suspend inline fun buildHistogram(word: String): HashMap { + return word.asSequence().fold(HashMap()) { accumulator, value -> + var newValue: MutableLong? = accumulator[value] + if (newValue == null) { + newValue = MutableLong() + accumulator[value] = newValue + } + newValue.incAndSet() + accumulator + } + } + + private fun String.asSequence(startIndex: Int = 0, endIndex: Int = length) = flow { + for (i in startIndex until endIndex.coerceAtMost(length)) { + emit(get(i).toInt()) + } + } +} diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SequencePlaysScrabble.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SequencePlaysScrabble.kt new file mode 100644 index 0000000000..5f4f4c2d1a --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SequencePlaysScrabble.kt @@ -0,0 +1,103 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.flow.scrabble + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.openjdk.jmh.annotations.* +import java.lang.Long.* +import java.util.* +import java.util.concurrent.* + +@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +open class SequencePlaysScrabble : ShakespearePlaysScrabble() { + + @Benchmark + public override fun play(): List>> { + val score2: (String) -> Int = { word: String -> + buildHistogram(word) + .map { it.letterScore() } + .sum() + } + + val bonusForDoubleLetter: (String) -> Int = { word: String -> + toBeMaxed(word) + .map { letterScores[it - 'a'.toInt()] } + .max()!! + } + + val score3: (String) -> Int = { word: String -> + val sum = score2(word) + bonusForDoubleLetter(word) + sum * 2 + if (word.length == 7) 50 else 0 + } + + val buildHistoOnScore: (((String) -> Int) -> Flow>>) = { score -> + flow { + emit(shakespeareWords.asSequence() + .filter({ scrabbleWords.contains(it) && checkBlanks(it) }) + .fold(TreeMap>(Collections.reverseOrder())) { acc, value -> + val key = score(value) + var list = acc[key] as MutableList? + if (list == null) { + list = ArrayList() + acc[key] = list + } + list.add(value) + acc + }) + } + } + + return runBlocking { + buildHistoOnScore(score3) + .flatMapConcatIterable { it.entries } + .take(3) + .toList() + } + } + + private fun Map.Entry.letterScore(): Int = letterScores[key - 'a'.toInt()] * Integer.min( + value.get().toInt(), + scrabbleAvailableLetters[key - 'a'.toInt()]) + + private fun toBeMaxed(word: String) = word.asSequence(startIndex = 3) + word.asSequence(endIndex = 3) + + private fun checkBlanks(word: String) = numBlanks(word) <= 2L + + private fun numBlanks(word: String): Long { + return buildHistogram(word) + .map { blanks(it) } + .sum() + } + + private fun blanks(entry: Map.Entry): Long = + max(0L, entry.value.get() - scrabbleAvailableLetters[entry.key - 'a'.toInt()]) + + private fun buildHistogram(word: String): HashMap { + return word.asSequence().fold(HashMap()) { accumulator, value -> + var newValue: MutableLong? = accumulator[value] + if (newValue == null) { + newValue = MutableLong() + accumulator[value] = newValue + } + newValue.incAndSet() + accumulator + } + } + + private fun String.asSequence(startIndex: Int = 0, endIndex: Int = length) = object : Sequence { + override fun iterator(): Iterator = object : Iterator { + private val _endIndex = endIndex.coerceAtMost(length) + private var currentIndex = startIndex + override fun hasNext(): Boolean = currentIndex < _endIndex + override fun next(): Int = get(currentIndex++).toInt() + } + } +} diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/ShakespearePlaysScrabble.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/ShakespearePlaysScrabble.kt new file mode 100644 index 0000000000..7eaa3f0a5d --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/ShakespearePlaysScrabble.kt @@ -0,0 +1,85 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.flow.scrabble + +import org.openjdk.jmh.annotations.* +import java.io.* +import java.util.stream.* +import java.util.zip.* + +@State(Scope.Benchmark) +abstract class ShakespearePlaysScrabble { + @Throws(Exception::class) + abstract fun play(): List>> + + public class MutableLong { + var value: Long = 0 + fun get(): Long { + return value + } + + fun incAndSet(): MutableLong { + value++ + return this + } + + fun add(other: MutableLong): MutableLong { + value += other.value + return this + } + } + + public interface LongWrapper { + fun get(): Long + + @JvmDefault + fun incAndSet(): LongWrapper { + return object : LongWrapper { + override fun get(): Long = this@LongWrapper.get() + 1L + } + } + + @JvmDefault + fun add(other: LongWrapper): LongWrapper { + return object : LongWrapper { + override fun get(): Long = this@LongWrapper.get() + other.get() + } + } + + companion object { + fun zero(): LongWrapper { + return object : LongWrapper { + override fun get(): Long = 0L + } + } + } + } + + @JvmField + public val letterScores: IntArray = intArrayOf(1, 3, 3, 2, 1, 4, 2, 4, 1, 8, 5, 1, 3, 1, 1, 3, 10, 1, 1, 1, 1, 4, 4, 8, 4, 10) + + @JvmField + public val scrabbleAvailableLetters: IntArray = + intArrayOf(9, 2, 2, 1, 12, 2, 3, 2, 9, 1, 1, 4, 2, 6, 8, 2, 1, 6, 4, 6, 4, 2, 2, 1, 2, 1) + + @JvmField + public val scrabbleWords: Set = readResource("ospd.txt.gz") + + @JvmField + public val shakespeareWords: Set = readResource("words.shakespeare.txt.gz") + + private fun readResource(path: String) = + BufferedReader(InputStreamReader(GZIPInputStream(this.javaClass.classLoader.getResourceAsStream(path)))).lines() + .map { it.toLowerCase() }.collect(Collectors.toSet()) + + init { + val expected = listOf(120 to listOf("jezebel", "quickly"), + 118 to listOf("zephyrs"), 116 to listOf("equinox")) + val actual = play().map { it.key to it.value } + if (expected != actual) { + error("Incorrect benchmark, output: $actual") + } + } +} diff --git a/benchmarks/src/jmh/resources/ospd.txt.gz b/benchmarks/src/jmh/resources/ospd.txt.gz new file mode 100644 index 0000000000..8a3074e927 Binary files /dev/null and b/benchmarks/src/jmh/resources/ospd.txt.gz differ diff --git a/benchmarks/src/jmh/resources/words.shakespeare.txt.gz b/benchmarks/src/jmh/resources/words.shakespeare.txt.gz new file mode 100644 index 0000000000..456f56cbeb Binary files /dev/null and b/benchmarks/src/jmh/resources/words.shakespeare.txt.gz differ diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index 366906acd6..558b62ed8e 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -792,7 +792,6 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun asFlow ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow; public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel; public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel; - public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; @@ -814,6 +813,7 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun flattenMerge (Lkotlinx/coroutines/flow/Flow;II)Lkotlinx/coroutines/flow/Flow; public static synthetic fun flattenMerge$default (Lkotlinx/coroutines/flow/Flow;IIILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; public static final fun flow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; + public static final fun flowOf (Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow; public static final fun flowOf ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow; public static final fun flowOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/Flow; public static synthetic fun flowOn$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; @@ -821,7 +821,6 @@ public final class kotlinx/coroutines/flow/FlowKt { public static synthetic fun flowViaChannel$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; public static final fun flowWith (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static synthetic fun flowWith$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; - public static final fun fold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun map (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun mapNotNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; diff --git a/gradle.properties b/gradle.properties index bc2a75778a..13b510dcef 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,7 +1,7 @@ # Kotlin version=1.2.1-SNAPSHOT group=org.jetbrains.kotlinx -kotlin_version=1.3.30 +kotlin_version=1.3.31 # Dependencies junit_version=4.12 diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt index 1416589854..06a5c00e2f 100644 --- a/kotlinx-coroutines-core/common/src/flow/Builders.kt +++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt @@ -58,7 +58,7 @@ public fun flow(@BuilderInference block: suspend FlowCollector.() -> Unit */ @FlowPreview @PublishedApi -internal fun unsafeFlow(@BuilderInference block: suspend FlowCollector.() -> Unit): Flow { +internal inline fun unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector.() -> Unit): Flow { return object : Flow { override suspend fun collect(collector: FlowCollector) { collector.block() @@ -127,6 +127,18 @@ public fun flowOf(vararg elements: T): Flow = unsafeFlow { } } +/** + * Creates flow that produces a given [value]. + */ +@FlowPreview +public fun flowOf(value: T): Flow = unsafeFlow { + /* + * Implementation note: this is just an "optimized" overload of flowOf(vararg) + * which significantly reduce the footprint of widespread single-value flows. + */ + emit(value) +} + /** * Returns an empty flow. */ @@ -141,7 +153,7 @@ private object EmptyFlow : Flow { * Creates a flow that produces values from the given array. */ @FlowPreview -public fun Array.asFlow(): Flow = flow { +public fun Array.asFlow(): Flow = unsafeFlow { forEach { value -> emit(value) } @@ -151,7 +163,7 @@ public fun Array.asFlow(): Flow = flow { * Creates flow that produces values from the given array. */ @FlowPreview -public fun IntArray.asFlow(): Flow = flow { +public fun IntArray.asFlow(): Flow = unsafeFlow { forEach { value -> emit(value) } @@ -161,7 +173,7 @@ public fun IntArray.asFlow(): Flow = flow { * Creates flow that produces values from the given array. */ @FlowPreview -public fun LongArray.asFlow(): Flow = flow { +public fun LongArray.asFlow(): Flow = unsafeFlow { forEach { value -> emit(value) } @@ -171,7 +183,7 @@ public fun LongArray.asFlow(): Flow = flow { * Creates flow that produces values from the given range. */ @FlowPreview -public fun IntRange.asFlow(): Flow = flow { +public fun IntRange.asFlow(): Flow = unsafeFlow { forEach { value -> emit(value) } diff --git a/kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.kt b/kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.common.kt similarity index 64% rename from kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.kt rename to kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.common.kt index e432c18f8c..6d5a4b4d4d 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.common.kt @@ -10,7 +10,4 @@ import kotlinx.coroutines.* * This exception is thrown when operator need no more elements from the flow. * This exception should never escape outside of operator's implementation. */ -internal class AbortFlowException : CancellationException("Flow was aborted, no more elements needed") { - // TODO expect/actual - // override fun fillInStackTrace(): Throwable = this -} \ No newline at end of file +internal expect class AbortFlowException() : CancellationException diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt index 3dc021b635..17c1a4c4d6 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt @@ -50,7 +50,7 @@ public fun Flow.flowOn(flowContext: CoroutineContext, bufferSize: Int = 1 coroutineScope { val channel = produce(flowContext, capacity = bufferSize) { collect { value -> - send(value) + return@collect send(value) } } channel.consumeEach { value -> @@ -98,7 +98,7 @@ public fun Flow.flowWith( val originalContext = coroutineContext.minusKey(Job) val prepared = source.flowOn(originalContext, bufferSize) builder(prepared).flowOn(flowContext, bufferSize).collect { value -> - emit(value) + return@collect emit(value) } } } diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt index 3818efeb16..aff523dd99 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt @@ -26,10 +26,11 @@ import kotlinx.coroutines.flow.unsafeFlow as flow * ``` */ @FlowPreview -public fun Flow.transform(@BuilderInference transform: suspend FlowCollector.(value: T) -> Unit): Flow { +public inline fun Flow.transform(@BuilderInference crossinline transform: suspend FlowCollector.(value: T) -> Unit): Flow { return flow { collect { value -> - transform(value) + // kludge, without it Unit will be returned and TCE won't kick in, KT-28938 + return@collect transform(value) } } } @@ -38,9 +39,9 @@ public fun Flow.transform(@BuilderInference transform: suspend FlowCol * Returns a flow containing only values of the original flow that matches the given [predicate]. */ @FlowPreview -public fun Flow.filter(predicate: suspend (T) -> Boolean): Flow = flow { +public inline fun Flow.filter(crossinline predicate: suspend (T) -> Boolean): Flow = flow { collect { value -> - if (predicate(value)) emit(value) + if (predicate(value)) return@collect emit(value) } } @@ -48,9 +49,9 @@ public fun Flow.filter(predicate: suspend (T) -> Boolean): Flow = flow * Returns a flow containing only values of the original flow that do not match the given [predicate]. */ @FlowPreview -public fun Flow.filterNot(predicate: suspend (T) -> Boolean): Flow = flow { +public inline fun Flow.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow = flow { collect { value -> - if (!predicate(value)) emit(value) + if (!predicate(value)) return@collect emit(value) } } @@ -66,24 +67,24 @@ public inline fun Flow<*>.filterIsInstance(): Flow = filter { it */ @FlowPreview public fun Flow.filterNotNull(): Flow = flow { - collect { value -> if (value != null) emit(value) } + collect { value -> if (value != null) return@collect emit(value) } } /** * Returns a flow containing the results of applying the given [transform] function to each value of the original flow. */ @FlowPreview -public fun Flow.map(transform: suspend (value: T) -> R): Flow = transform { value -> - emit(transform(value)) +public inline fun Flow.map(crossinline transform: suspend (value: T) -> R): Flow = transform { value -> + return@transform emit(transform(value)) } /** * Returns a flow that contains only non-null results of applying the given [transform] function to each value of the original flow. */ @FlowPreview -public fun Flow.mapNotNull(transform: suspend (value: T) -> R?): Flow = transform { value -> +public inline fun Flow.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow = transform { value -> val transformed = transform(value) ?: return@transform - emit(transformed) + return@transform emit(transformed) } /** diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt index d0e04ff286..624b51f683 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt @@ -28,7 +28,7 @@ import kotlin.jvm.* * ``` */ @FlowPreview -public suspend fun Flow.collect(action: suspend (value: T) -> Unit): Unit = +public suspend inline fun Flow.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector { override suspend fun emit(value: T) = action(value) }) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index ac3c93cc0d..4afd0959b7 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -38,9 +38,9 @@ public suspend fun Flow.reduce(operation: suspend (accumulator: S, * Accumulates value starting with [initial] value and applying [operation] current accumulator value and each element */ @FlowPreview -public suspend fun Flow.fold( +public suspend inline fun Flow.fold( initial: R, - operation: suspend (acc: R, value: T) -> R + crossinline operation: suspend (acc: R, value: T) -> R ): R { var accumulator = initial collect { value -> diff --git a/kotlinx-coroutines-core/js/src/flow/internal/AbortFlowException.kt b/kotlinx-coroutines-core/js/src/flow/internal/AbortFlowException.kt new file mode 100644 index 0000000000..d6a9c31eaa --- /dev/null +++ b/kotlinx-coroutines-core/js/src/flow/internal/AbortFlowException.kt @@ -0,0 +1,9 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow.internal + +import kotlinx.coroutines.* + +internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed") diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/AbortFlowException.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/AbortFlowException.kt new file mode 100644 index 0000000000..7ff34e735b --- /dev/null +++ b/kotlinx-coroutines-core/jvm/src/flow/internal/AbortFlowException.kt @@ -0,0 +1,11 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow.internal + +import kotlinx.coroutines.* + +internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed") { + override fun fillInStackTrace(): Throwable = this +} diff --git a/kotlinx-coroutines-core/native/src/flow/internal/AbortFlowException.kt b/kotlinx-coroutines-core/native/src/flow/internal/AbortFlowException.kt new file mode 100644 index 0000000000..d6a9c31eaa --- /dev/null +++ b/kotlinx-coroutines-core/native/src/flow/internal/AbortFlowException.kt @@ -0,0 +1,9 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow.internal + +import kotlinx.coroutines.* + +internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed") diff --git a/kotlinx-coroutines-debug/test/CoroutinesDumpTest.kt b/kotlinx-coroutines-debug/test/CoroutinesDumpTest.kt index 35e9d1e9f2..ec727014cb 100644 --- a/kotlinx-coroutines-debug/test/CoroutinesDumpTest.kt +++ b/kotlinx-coroutines-debug/test/CoroutinesDumpTest.kt @@ -24,7 +24,6 @@ class CoroutinesDumpTest : DebugTestBase() { "Coroutine \"coroutine#1\":DeferredCoroutine{Active}@1e4a7dd4, state: SUSPENDED\n" + "\tat kotlinx.coroutines.debug.CoroutinesDumpTest.sleepingNestedMethod(CoroutinesDumpTest.kt:95)\n" + "\tat kotlinx.coroutines.debug.CoroutinesDumpTest.sleepingOuterMethod(CoroutinesDumpTest.kt:88)\n" + - "\tat kotlinx.coroutines.debug.CoroutinesDumpTest\$testSuspendedCoroutine\$1\$deferred\$1.invokeSuspend(CoroutinesDumpTest.kt:29)\n" + "\t(Coroutine creation stacktrace)\n" + "\tat kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:116)\n" + "\tat kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:23)\n" + @@ -39,13 +38,13 @@ class CoroutinesDumpTest : DebugTestBase() { fun testRunningCoroutine() = synchronized(monitor) { val deferred = GlobalScope.async { activeMethod(shouldSuspend = false) + assertTrue(true) } awaitCoroutineStarted() verifyDump( - "Coroutine \"coroutine#1\":DeferredCoroutine{Active}@1e4a7dd4, state: RUNNING (Last suspension stacktrace, not an actual stacktrace)\n" + - "\tat kotlinx.coroutines.debug.CoroutinesDumpTest\$testRunningCoroutine\$1\$deferred\$1.invokeSuspend(CoroutinesDumpTest.kt:49)\n" + - "\t(Coroutine creation stacktrace)\n" + + "Coroutine \"coroutine#1\":DeferredCoroutine{Active}@227d9994, state: RUNNING (Last suspension stacktrace, not an actual stacktrace)\n" + + "\t(Coroutine creation stacktrace)\n" + "\tat kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:116)\n" + "\tat kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:23)\n" + "\tat kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:99)\n" + @@ -72,7 +71,6 @@ class CoroutinesDumpTest : DebugTestBase() { "\tat java.lang.Thread.sleep(Native Method)\n" + "\tat kotlinx.coroutines.debug.CoroutinesDumpTest.nestedActiveMethod(CoroutinesDumpTest.kt:111)\n" + "\tat kotlinx.coroutines.debug.CoroutinesDumpTest.activeMethod(CoroutinesDumpTest.kt:106)\n" + - "\tat kotlinx.coroutines.debug.CoroutinesDumpTest\$testRunningCoroutineWithSuspensionPoint\$1\$deferred\$1.invokeSuspend(CoroutinesDumpTest.kt:71)\n" + "\t(Coroutine creation stacktrace)\n" + "\tat kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:116)\n" + "\tat kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:23)\n" + diff --git a/kotlinx-coroutines-debug/test/DebugProbesTest.kt b/kotlinx-coroutines-debug/test/DebugProbesTest.kt index 9dd4d7cec0..35d024178d 100644 --- a/kotlinx-coroutines-debug/test/DebugProbesTest.kt +++ b/kotlinx-coroutines-debug/test/DebugProbesTest.kt @@ -45,7 +45,6 @@ class DebugProbesTest : TestBase() { "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt)\n" + "\tat kotlinx.coroutines.debug.DebugProbesTest.oneMoreNestedMethod(DebugProbesTest.kt:71)\n" + "\tat kotlinx.coroutines.debug.DebugProbesTest.nestedMethod(DebugProbesTest.kt:66)\n" + - "\tat kotlinx.coroutines.debug.DebugProbesTest\$testAsyncWithProbes\$1\$1.invokeSuspend(DebugProbesTest.kt:43)\n" + "\t(Coroutine creation stacktrace)\n" + "\tat kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:116)\n" + "\tat kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:23)\n" + @@ -76,7 +75,6 @@ class DebugProbesTest : TestBase() { "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt)\n" + "\tat kotlinx.coroutines.debug.DebugProbesTest.oneMoreNestedMethod(DebugProbesTest.kt:71)\n" + "\tat kotlinx.coroutines.debug.DebugProbesTest.nestedMethod(DebugProbesTest.kt:66)\n" + - "\tat kotlinx.coroutines.debug.DebugProbesTest\$testAsyncWithSanitizedProbes\$1\$1.invokeSuspend(DebugProbesTest.kt:43)\n" + "\t(Coroutine creation stacktrace)\n" + "\tat kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:116)\n" + "\tat kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:23)\n" + diff --git a/kotlinx-coroutines-debug/test/SanitizedProbesTest.kt b/kotlinx-coroutines-debug/test/SanitizedProbesTest.kt index 3ee80ad38f..c990c3e085 100644 --- a/kotlinx-coroutines-debug/test/SanitizedProbesTest.kt +++ b/kotlinx-coroutines-debug/test/SanitizedProbesTest.kt @@ -86,7 +86,6 @@ class SanitizedProbesTest : DebugTestBase() { "\tat kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:116)", "Coroutine \"coroutine#2\":StandaloneCoroutine{Active}@1b68b9a4, state: SUSPENDED\n" + - "\tat definitely.not.kotlinx.coroutines.SanitizedProbesTest\$launchSelector\$1\$1\$1.invokeSuspend(SanitizedProbesTest.kt:105)\n" + "\tat definitely.not.kotlinx.coroutines.SanitizedProbesTest\$launchSelector\$1.invokeSuspend(SanitizedProbesTest.kt:143)\n" + "\t(Coroutine creation stacktrace)\n" + "\tat kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:116)\n" + diff --git a/ui/kotlinx-coroutines-android/animation-app/gradle.properties b/ui/kotlinx-coroutines-android/animation-app/gradle.properties index bd2f170459..342b103ab2 100644 --- a/ui/kotlinx-coroutines-android/animation-app/gradle.properties +++ b/ui/kotlinx-coroutines-android/animation-app/gradle.properties @@ -18,6 +18,6 @@ org.gradle.jvmargs=-Xmx1536m kotlin.coroutines=enable -kotlin_version=1.3.30 +kotlin_version=1.3.31 coroutines_version=1.2.1 diff --git a/ui/kotlinx-coroutines-android/example-app/gradle.properties b/ui/kotlinx-coroutines-android/example-app/gradle.properties index bd2f170459..342b103ab2 100644 --- a/ui/kotlinx-coroutines-android/example-app/gradle.properties +++ b/ui/kotlinx-coroutines-android/example-app/gradle.properties @@ -18,6 +18,6 @@ org.gradle.jvmargs=-Xmx1536m kotlin.coroutines=enable -kotlin_version=1.3.30 +kotlin_version=1.3.31 coroutines_version=1.2.1