Skip to content

Introduce SharedFlow and sharing operators #2069

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6e7aa7b
Introduce SharedFlow and sharing operators
elizarov May 8, 2020
67ec501
Merge branch 'develop' into shared-flow
elizarov Oct 9, 2020
3028679
~ Experimental resetReplayCache
elizarov Oct 9, 2020
8c83d34
~ No default for shareIn/stateIn started parameter
elizarov Oct 9, 2020
4196a1a
~ Deprecate Flow.broadcastIn
elizarov Oct 12, 2020
f67bbb7
~ Rephrased SharingStarted docs
elizarov Oct 12, 2020
6567aee
~ Added version to hidden declarations
elizarov Oct 12, 2020
c349fde
~ Private val flow in CancellableFlowImpl
elizarov Oct 12, 2020
1658064
~ Build: testNG task should be triggered by check, not test
elizarov Oct 12, 2020
cdb3162
~ Additional PublisherAsFlow tests for asFlow().buffer(...)
elizarov Oct 12, 2020
f0e7c6a
~ Fixed StartedWhileSubscribed.hashCode
elizarov Oct 12, 2020
3e9b12f
~ Optimized sharedIn(Lazily/Eagerly), removed DistinctFlow abstraction
elizarov Oct 12, 2020
de97928
~ Fix new DistinctUntilChanged tests on K/N
elizarov Oct 12, 2020
06cfa80
~ Bit more details in subscriptionCount docs
elizarov Oct 12, 2020
4735501
~ ArrayChannel params checks with assert
elizarov Oct 12, 2020
e86a111
~ Revert ArrayChannel params checks with assert, added comment
elizarov Oct 12, 2020
9120f28
~ Improve SharedFlow & StateFlow example code
elizarov Oct 12, 2020
cc03f3a
~ More eager cancellation check in StateFlow, better test
elizarov Oct 12, 2020
322f615
~ Updated API dump
elizarov Oct 12, 2020
22e4800
~ Optimized resume list to array
elizarov Oct 12, 2020
faaad4f
~ Optimized delay(Long.MAX_VALUE), SharingStarted docs
elizarov Oct 12, 2020
56ff4fb
~ Moved AbstractSharedFlow to internal package
elizarov Oct 12, 2020
17c8fac
~ More detailed docs on StateFlow.compareAndSet and test
elizarov Oct 12, 2020
25e08d4
~ Fixed Robolectric test for optimized delay impl
elizarov Oct 12, 2020
fe64780
~ Added comment on how freeSlot can resume coroutines
elizarov Oct 12, 2020
a0f7666
~ Code style
elizarov Oct 12, 2020
1900c9c
Merge remote-tracking branch 'origin/develop' into shared-flow
elizarov Oct 12, 2020
0fa4de6
~ One more fix for delay(MAX_VALUE) optimization
elizarov Oct 12, 2020
996e618
~ Fixed distinctUntilChanged on Kotlin/Native
elizarov Oct 12, 2020
bd32090
~ Tweaked defaults and parameter order for shareIn
elizarov Oct 13, 2020
db7e689
~ Code style
elizarov Oct 13, 2020
5db8a5d
~ Text typo
elizarov Oct 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ suspend fun main() = coroutineScope {
* [DebugProbes] API to probe, keep track of, print and dump active coroutines;
* [CoroutinesTimeout] test rule to automatically dump coroutines on test timeout.
* [reactive](reactive/README.md) — modules that provide builders and iteration support for various reactive streams libraries:
* Reactive Streams ([Publisher.collect], [Publisher.awaitSingle], [publish], etc),
* Reactive Streams ([Publisher.collect], [Publisher.awaitSingle], [kotlinx.coroutines.reactive.publish], etc),
* Flow (JDK 9) (the same interface as for Reactive Streams),
* RxJava 2.x ([rxFlowable], [rxSingle], etc), and
* RxJava 3.x ([rxFlowable], [rxSingle], etc), and
Expand Down Expand Up @@ -302,7 +302,7 @@ The `develop` branch is pushed to `master` during release.
<!--- INDEX kotlinx.coroutines.reactive -->
[Publisher.collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/collect.html
[Publisher.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-single.html
[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html
[kotlinx.coroutines.reactive.publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html
<!--- MODULE kotlinx-coroutines-rx2 -->
<!--- INDEX kotlinx.coroutines.rx2 -->
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html
Expand Down
91 changes: 78 additions & 13 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,14 @@ public final class kotlinx/coroutines/channels/BroadcastKt {
public static synthetic fun broadcast$default (Lkotlinx/coroutines/channels/ReceiveChannel;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
}

public final class kotlinx/coroutines/channels/BufferOverflow : java/lang/Enum {
public static final field DROP_LATEST Lkotlinx/coroutines/channels/BufferOverflow;
public static final field DROP_OLDEST Lkotlinx/coroutines/channels/BufferOverflow;
public static final field SUSPEND Lkotlinx/coroutines/channels/BufferOverflow;
public static fun valueOf (Ljava/lang/String;)Lkotlinx/coroutines/channels/BufferOverflow;
public static fun values ()[Lkotlinx/coroutines/channels/BufferOverflow;
}

public abstract interface class kotlinx/coroutines/channels/Channel : kotlinx/coroutines/channels/ReceiveChannel, kotlinx/coroutines/channels/SendChannel {
public static final field BUFFERED I
public static final field CONFLATED I
Expand Down Expand Up @@ -611,9 +619,9 @@ public final class kotlinx/coroutines/channels/ChannelIterator$DefaultImpls {

public final class kotlinx/coroutines/channels/ChannelKt {
public static final synthetic fun Channel (I)Lkotlinx/coroutines/channels/Channel;
public static final fun Channel (ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/channels/Channel;
public static final fun Channel (ILkotlinx/coroutines/channels/BufferOverflow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/channels/Channel;
public static synthetic fun Channel$default (IILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
public static synthetic fun Channel$default (ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
public static synthetic fun Channel$default (ILkotlinx/coroutines/channels/BufferOverflow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
}

public final class kotlinx/coroutines/channels/ChannelsKt {
Expand Down Expand Up @@ -868,7 +876,7 @@ public final class kotlinx/coroutines/debug/internal/DebuggerInfo : java/io/Seri
public final fun getState ()Ljava/lang/String;
}

public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/flow/Flow {
public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/flow/CancellableFlow, kotlinx/coroutines/flow/Flow {
public fun <init> ()V
public final fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun collectSafely (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand All @@ -895,10 +903,15 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun asFlow ([I)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow ([J)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun asSharedFlow (Lkotlinx/coroutines/flow/MutableSharedFlow;)Lkotlinx/coroutines/flow/SharedFlow;
public static final fun asStateFlow (Lkotlinx/coroutines/flow/MutableStateFlow;)Lkotlinx/coroutines/flow/StateFlow;
public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
public static final fun buffer (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static final synthetic fun buffer (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static final fun buffer (Lkotlinx/coroutines/flow/Flow;ILkotlinx/coroutines/channels/BufferOverflow;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun buffer$default (Lkotlinx/coroutines/flow/Flow;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun buffer$default (Lkotlinx/coroutines/flow/Flow;ILkotlinx/coroutines/channels/BufferOverflow;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun cache (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun callbackFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun cancellable (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun catch (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
Expand Down Expand Up @@ -988,10 +1001,15 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun onErrorReturn (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun onErrorReturn$default (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun onStart (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun onSubscription (Lkotlinx/coroutines/flow/SharedFlow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/SharedFlow;
public static final fun produceIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun publish (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun publish (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static final fun publishOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
public static final fun receiveAsFlow (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/flow/Flow;
public static final fun reduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun replay (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun replay (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static final synthetic fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun retry (Lkotlinx/coroutines/flow/Flow;JLkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
Expand All @@ -1003,11 +1021,14 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun scan (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun scanFold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun scanReduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun shareIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/flow/SharingStarted;)Lkotlinx/coroutines/flow/SharedFlow;
public static final fun single (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun singleOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun skip (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static final fun startWith (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun startWith (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun stateIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun stateIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;Ljava/lang/Object;)Lkotlinx/coroutines/flow/StateFlow;
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;)V
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)V
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)V
Expand All @@ -1031,19 +1052,61 @@ public final class kotlinx/coroutines/flow/FlowKt {
public final class kotlinx/coroutines/flow/LintKt {
public static final fun cancel (Lkotlinx/coroutines/flow/FlowCollector;Ljava/util/concurrent/CancellationException;)V
public static synthetic fun cancel$default (Lkotlinx/coroutines/flow/FlowCollector;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
public static final fun cancellable (Lkotlinx/coroutines/flow/SharedFlow;)Lkotlinx/coroutines/flow/Flow;
public static final fun conflate (Lkotlinx/coroutines/flow/StateFlow;)Lkotlinx/coroutines/flow/Flow;
public static final fun distinctUntilChanged (Lkotlinx/coroutines/flow/StateFlow;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOn (Lkotlinx/coroutines/flow/StateFlow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOn (Lkotlinx/coroutines/flow/SharedFlow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
public static final fun getCoroutineContext (Lkotlinx/coroutines/flow/FlowCollector;)Lkotlin/coroutines/CoroutineContext;
public static final fun isActive (Lkotlinx/coroutines/flow/FlowCollector;)Z
}

public abstract interface class kotlinx/coroutines/flow/MutableStateFlow : kotlinx/coroutines/flow/StateFlow {
public abstract interface class kotlinx/coroutines/flow/MutableSharedFlow : kotlinx/coroutines/flow/FlowCollector, kotlinx/coroutines/flow/SharedFlow {
public abstract fun getSubscriptionCount ()Lkotlinx/coroutines/flow/StateFlow;
public abstract fun resetReplayCache ()V
public abstract fun tryEmit (Ljava/lang/Object;)Z
}

public abstract interface class kotlinx/coroutines/flow/MutableStateFlow : kotlinx/coroutines/flow/MutableSharedFlow, kotlinx/coroutines/flow/StateFlow {
public abstract fun compareAndSet (Ljava/lang/Object;Ljava/lang/Object;)Z
public abstract fun getValue ()Ljava/lang/Object;
public abstract fun setValue (Ljava/lang/Object;)V
}

public abstract interface class kotlinx/coroutines/flow/StateFlow : kotlinx/coroutines/flow/Flow {
public abstract interface class kotlinx/coroutines/flow/SharedFlow : kotlinx/coroutines/flow/Flow {
public abstract fun getReplayCache ()Ljava/util/List;
}

public final class kotlinx/coroutines/flow/SharedFlowKt {
public static final fun MutableSharedFlow (IILkotlinx/coroutines/channels/BufferOverflow;)Lkotlinx/coroutines/flow/MutableSharedFlow;
public static synthetic fun MutableSharedFlow$default (IILkotlinx/coroutines/channels/BufferOverflow;ILjava/lang/Object;)Lkotlinx/coroutines/flow/MutableSharedFlow;
}

public final class kotlinx/coroutines/flow/SharingCommand : java/lang/Enum {
public static final field START Lkotlinx/coroutines/flow/SharingCommand;
public static final field STOP Lkotlinx/coroutines/flow/SharingCommand;
public static final field STOP_AND_RESET_REPLAY_CACHE Lkotlinx/coroutines/flow/SharingCommand;
public static fun valueOf (Ljava/lang/String;)Lkotlinx/coroutines/flow/SharingCommand;
public static fun values ()[Lkotlinx/coroutines/flow/SharingCommand;
}

public abstract interface class kotlinx/coroutines/flow/SharingStarted {
public static final field Companion Lkotlinx/coroutines/flow/SharingStarted$Companion;
public abstract fun command (Lkotlinx/coroutines/flow/StateFlow;)Lkotlinx/coroutines/flow/Flow;
}

public final class kotlinx/coroutines/flow/SharingStarted$Companion {
public final fun WhileSubscribed (JJ)Lkotlinx/coroutines/flow/SharingStarted;
public static synthetic fun WhileSubscribed$default (Lkotlinx/coroutines/flow/SharingStarted$Companion;JJILjava/lang/Object;)Lkotlinx/coroutines/flow/SharingStarted;
public final fun getEagerly ()Lkotlinx/coroutines/flow/SharingStarted;
public final fun getLazily ()Lkotlinx/coroutines/flow/SharingStarted;
}

public final class kotlinx/coroutines/flow/SharingStartedKt {
public static final fun WhileSubscribed-9tZugJw (Lkotlinx/coroutines/flow/SharingStarted$Companion;DD)Lkotlinx/coroutines/flow/SharingStarted;
public static synthetic fun WhileSubscribed-9tZugJw$default (Lkotlinx/coroutines/flow/SharingStarted$Companion;DDILjava/lang/Object;)Lkotlinx/coroutines/flow/SharingStarted;
}

public abstract interface class kotlinx/coroutines/flow/StateFlow : kotlinx/coroutines/flow/SharedFlow {
public abstract fun getValue ()Ljava/lang/Object;
}

Expand All @@ -1054,13 +1117,15 @@ public final class kotlinx/coroutines/flow/StateFlowKt {
public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/internal/FusibleFlow {
public final field capacity I
public final field context Lkotlin/coroutines/CoroutineContext;
public fun <init> (Lkotlin/coroutines/CoroutineContext;I)V
public fun additionalToStringProps ()Ljava/lang/String;
public final field onBufferOverflow Lkotlinx/coroutines/channels/BufferOverflow;
public fun <init> (Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/channels/BufferOverflow;)V
protected fun additionalToStringProps ()Ljava/lang/String;
public fun broadcastImpl (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected abstract fun create (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
public fun fuse (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/FusibleFlow;
protected abstract fun create (Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/channels/BufferOverflow;)Lkotlinx/coroutines/flow/internal/ChannelFlow;
public fun dropChannelOperators ()Lkotlinx/coroutines/flow/Flow;
public fun fuse (Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/channels/BufferOverflow;)Lkotlinx/coroutines/flow/Flow;
public fun produceImpl (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
public fun toString ()Ljava/lang/String;
}
Expand All @@ -1074,11 +1139,11 @@ public final class kotlinx/coroutines/flow/internal/FlowExceptions_commonKt {
}

public abstract interface class kotlinx/coroutines/flow/internal/FusibleFlow : kotlinx/coroutines/flow/Flow {
public abstract fun fuse (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/FusibleFlow;
public abstract fun fuse (Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/channels/BufferOverflow;)Lkotlinx/coroutines/flow/Flow;
}

public final class kotlinx/coroutines/flow/internal/FusibleFlow$DefaultImpls {
public static synthetic fun fuse$default (Lkotlinx/coroutines/flow/internal/FusibleFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/FusibleFlow;
public static synthetic fun fuse$default (Lkotlinx/coroutines/flow/internal/FusibleFlow;Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/channels/BufferOverflow;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
}

public final class kotlinx/coroutines/flow/internal/SafeCollector_commonKt {
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/CompletableDeferred.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import kotlinx.coroutines.selects.*
* All functions on this interface are **thread-safe** and can
* be safely invoked from concurrent coroutines without external synchronization.
*
* **`CompletableDeferred` interface is not stable for inheritance in 3rd party libraries**,
* **The `CompletableDeferred` interface is not stable for inheritance in 3rd party libraries**,
* as new methods might be added to this interface in the future, but is stable for use.
*/
public interface CompletableDeferred<T> : Deferred<T> {
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/CompletableJob.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ package kotlinx.coroutines
* All functions on this interface are **thread-safe** and can
* be safely invoked from concurrent coroutines without external synchronization.
*
* **`CompletableJob` interface is not stable for inheritance in 3rd party libraries**,
* **The `CompletableJob` interface is not stable for inheritance in 3rd party libraries**,
* as new methods might be added to this interface in the future, but is stable for use.
*/
public interface CompletableJob : Job {
Expand Down
5 changes: 4 additions & 1 deletion kotlinx-coroutines-core/common/src/Delay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ public suspend fun awaitCancellation(): Nothing = suspendCancellableCoroutine {}
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
// if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
if (timeMillis < Long.MAX_VALUE) {
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
}

Expand Down
10 changes: 5 additions & 5 deletions kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -1019,23 +1019,23 @@ internal val EMPTY = Symbol("EMPTY") // marker for Conflated & Buffered channels

@JvmField
@SharedImmutable
internal val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
internal val OFFER_SUCCESS = Symbol("OFFER_SUCCESS")

@JvmField
@SharedImmutable
internal val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
internal val OFFER_FAILED = Symbol("OFFER_FAILED")

@JvmField
@SharedImmutable
internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
internal val POLL_FAILED = Symbol("POLL_FAILED")

@JvmField
@SharedImmutable
internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
internal val ENQUEUE_FAILED = Symbol("ENQUEUE_FAILED")

@JvmField
@SharedImmutable
internal val HANDLER_INVOKED: Any = Symbol("ON_CLOSE_HANDLER_INVOKED")
internal val HANDLER_INVOKED = Symbol("ON_CLOSE_HANDLER_INVOKED")

internal typealias Handler = (Throwable?) -> Unit

Expand Down
Loading