Skip to content

Commit b77a80c

Browse files
committed
Flow: decouple buffer size from various operators and fuse
* Introduce buffer operator * Remove buffer size from all the other operators * Fuse all adjacent operators that create channels * Introduce Channel.BUFFERED buffer size marker to request buffered channel with a default (unspecified) size Fixes #1233
1 parent b73ebdc commit b77a80c

File tree

19 files changed

+924
-276
lines changed

19 files changed

+924
-276
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+18-14
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,9 @@ public final class kotlinx/coroutines/channels/BroadcastKt {
542542
}
543543

544544
public abstract interface class kotlinx/coroutines/channels/Channel : kotlinx/coroutines/channels/ReceiveChannel, kotlinx/coroutines/channels/SendChannel {
545+
public static final field BUFFERED I
545546
public static final field CONFLATED I
547+
public static final field DEFAULT_BUFFER_PROPERTY_NAME Ljava/lang/String;
546548
public static final field Factory Lkotlinx/coroutines/channels/Channel$Factory;
547549
public static final field RENDEZVOUS I
548550
public static final field UNLIMITED I
@@ -553,7 +555,9 @@ public final class kotlinx/coroutines/channels/Channel$DefaultImpls {
553555
}
554556

555557
public final class kotlinx/coroutines/channels/Channel$Factory {
558+
public static final field BUFFERED I
556559
public static final field CONFLATED I
560+
public static final field DEFAULT_BUFFER_PROPERTY_NAME Ljava/lang/String;
557561
public static final field RENDEZVOUS I
558562
public static final field UNLIMITED I
559563
}
@@ -785,6 +789,7 @@ public abstract interface class kotlinx/coroutines/flow/FlowCollector {
785789
}
786790

787791
public final class kotlinx/coroutines/flow/FlowKt {
792+
public static final field DEFAULT_CONCURRENCY_PROPERTY_NAME Ljava/lang/String;
788793
public static final fun asFlow (Ljava/lang/Iterable;)Lkotlinx/coroutines/flow/Flow;
789794
public static final fun asFlow (Ljava/util/Iterator;)Lkotlinx/coroutines/flow/Flow;
790795
public static final fun asFlow (Lkotlin/jvm/functions/Function0;)Lkotlinx/coroutines/flow/Flow;
@@ -796,12 +801,12 @@ public final class kotlinx/coroutines/flow/FlowKt {
796801
public static final fun asFlow ([I)Lkotlinx/coroutines/flow/Flow;
797802
public static final fun asFlow ([J)Lkotlinx/coroutines/flow/Flow;
798803
public static final fun asFlow ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
799-
public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
800-
public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
801-
public static final fun callbackFlow (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
802-
public static synthetic fun callbackFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
803-
public static final fun channelFlow (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
804-
public static synthetic fun channelFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
804+
public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
805+
public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
806+
public static final fun buffer (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
807+
public static synthetic fun buffer$default (Lkotlinx/coroutines/flow/Flow;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
808+
public static final fun callbackFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
809+
public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
805810
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
806811
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
807812
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
@@ -821,29 +826,28 @@ public final class kotlinx/coroutines/flow/FlowKt {
821826
public static final fun filterNot (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
822827
public static final fun filterNotNull (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
823828
public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
824-
public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
825-
public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
829+
public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
830+
public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
826831
public static final fun flattenConcat (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
827-
public static final fun flattenMerge (Lkotlinx/coroutines/flow/Flow;II)Lkotlinx/coroutines/flow/Flow;
828-
public static synthetic fun flattenMerge$default (Lkotlinx/coroutines/flow/Flow;IIILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
832+
public static final fun flattenMerge (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
833+
public static synthetic fun flattenMerge$default (Lkotlinx/coroutines/flow/Flow;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
829834
public static final fun flow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
830835
public static final fun flowOf (Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
831836
public static final fun flowOf ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
832-
public static final fun flowOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/Flow;
833-
public static synthetic fun flowOn$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
837+
public static final fun flowOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
834838
public static final fun flowViaChannel (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
835839
public static synthetic fun flowViaChannel$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
836840
public static final fun flowWith (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
837841
public static synthetic fun flowWith$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
842+
public static final fun getDEFAULT_CONCURRENCY ()I
838843
public static final fun map (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
839844
public static final fun mapNotNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
840845
public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
841846
public static final fun onErrorCollect (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
842847
public static synthetic fun onErrorCollect$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
843848
public static final fun onErrorReturn (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
844849
public static synthetic fun onErrorReturn$default (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
845-
public static final fun produceIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;I)Lkotlinx/coroutines/channels/ReceiveChannel;
846-
public static synthetic fun produceIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
850+
public static final fun produceIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
847851
public static final fun reduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
848852
public static final fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
849853
public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;

kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt

+5
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ package kotlinx.coroutines.channels
88

99
import kotlinx.coroutines.*
1010
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
11+
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
12+
import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
1113
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
1214

1315
/**
@@ -50,9 +52,11 @@ public interface BroadcastChannel<E> : SendChannel<E> {
5052
* Creates a broadcast channel with the specified buffer capacity.
5153
*
5254
* The resulting channel type depends on the specified [capacity] parameter:
55+
*
5356
* * when `capacity` positive, but less than [UNLIMITED] -- creates `ArrayBroadcastChannel` with a buffer of given capacity.
5457
* **Note:** this channel looses all items that are send to it until the first subscriber appears;
5558
* * when `capacity` is [CONFLATED] -- creates [ConflatedBroadcastChannel] that conflates back-to-back sends;
59+
* * when `capacity` is [BUFFERED] -- creates `ArrayBroadcastChannel` with a default capacity.
5660
* * otherwise -- throws [IllegalArgumentException].
5761
*
5862
* **Note: This is an experimental api.** It may be changed in the future updates.
@@ -63,5 +67,6 @@ public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
6367
0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel")
6468
UNLIMITED -> throw IllegalArgumentException("Unsupported UNLIMITED capacity for BroadcastChannel")
6569
CONFLATED -> ConflatedBroadcastChannel()
70+
BUFFERED -> ArrayBroadcastChannel(CHANNEL_DEFAULT_CAPACITY)
6671
else -> ArrayBroadcastChannel(capacity)
6772
}

kotlinx-coroutines-core/common/src/channels/Channel.kt

+27-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ import kotlinx.coroutines.*
1010
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
1111
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
1212
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
13+
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
14+
import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
15+
import kotlinx.coroutines.internal.systemProp
1316
import kotlinx.coroutines.selects.*
1417
import kotlin.jvm.*
1518

@@ -372,20 +375,43 @@ public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
372375
* Requests conflated channel in `Channel(...)` factory function -- the `ConflatedChannel` gets created.
373376
*/
374377
public const val CONFLATED = -1
378+
379+
/**
380+
* Requests buffered channel with a default buffer capacity in `Channel(...)` factory function --
381+
* the `ArrayChannel` gets created with a default capacity.
382+
* This capacity is equal to 16 by default and can be overridden by setting
383+
* [DEFAULT_BUFFER_PROPERTY_NAME] on JVM.
384+
*/
385+
public const val BUFFERED = -2
386+
387+
// only for internal use, cannot be used with Channel(...)
388+
internal const val OPTIONAL_CHANNEL = -3
389+
390+
/**
391+
* Name of the property that defines the default channel capacity when
392+
* [BUFFERED] is used as parameter in `Channel(...)` factory function.
393+
*/
394+
public const val DEFAULT_BUFFER_PROPERTY_NAME = "kotlinx.coroutines.channels.defaultBuffer"
395+
396+
internal val CHANNEL_DEFAULT_CAPACITY = systemProp(DEFAULT_BUFFER_PROPERTY_NAME,
397+
16, 1, UNLIMITED - 1
398+
)
375399
}
376400
}
377401

378402
/**
379403
* Creates a channel with the specified buffer capacity (or without a buffer by default).
380404
* See [Channel] interface documentation for details.
381405
*
382-
* @throws IllegalArgumentException when [capacity] < -1
406+
* @param capacity either a positive channel capacity or one of the constants defined in [Channel.Factory].
407+
* @throws IllegalArgumentException when [capacity] < -2
383408
*/
384409
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
385410
when (capacity) {
386411
RENDEZVOUS -> RendezvousChannel()
387412
UNLIMITED -> LinkedListChannel()
388413
CONFLATED -> ConflatedChannel()
414+
BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY)
389415
else -> ArrayChannel(capacity)
390416
}
391417

0 commit comments

Comments
 (0)