You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Let's say you have a list and you want to perform some function asynchronously on and return a list from that. It would be convenient to have a function that lets you map things asynchronously without having to right out all of the logic for creating a list of Deferred<T>.
You may also want to do async calls in order and then get the result of a transformed list.
// inside a coroutine
val list: List<Int> = (0..5).toList()
val transformedMap = list.deferredMap(Dispatchers.IO) {
delay(100L - it * 10L)
println(it)
}
transformMap.await()
/* output
5
4
3
2
1
0
*/
The Shape of the API
/**
* Map a list of items to a list of deferred items. The deferred items are then awaited and the resulting list is returned.
* @param coroutineScope The coroutine scope to use for the async operations
* @param chunkSize the number of coroutines to run at the same time asynchronously. The iterable is taken and split into chunks of this amount
* @param throwException if true, when any exception happens in any transform, the exception will be thrown
* @param stopChunkOnException if true, when any exception happens in any transform, the remaining chunks will not be processed
* @param transform the function to transform each item
* @return the list of deferred transformed items
* @throws Exception if any exception happens in any transform and throwException is true
*/
suspend fun <I, T> Iterable<I>.deferredMap(
coroutineScope: CoroutineScope,
chunkSize: Int? = null,
throwException: Boolean = false,
stopChunkOnException: Boolean = false,
noinline transform: suspend CoroutineScope.(I) -> T,
): Deferred<List<T>>
/**
* Map a list of items to a list of transformed items using coroutines in order
* @param coroutineScope the scope in which to run the coroutines
* @param throwException if true, when any exception happens in any transform, the exception will be thrown
* @param stopOnException if true, when any exception happens in any transform, the rest of the remaining transforms will not be executed
* @param transform the function to transform each item
* @return the list of transformed items
*/
suspend inline fun <I, T> Iterable<I>.toAsyncInOrderMap(
coroutineScope: CoroutineScope,
throwException: Boolean = false,
stopOnException: Boolean = false,
crossinline transform: suspend CoroutineScope.(I) -> T,
): Deferred<List<T>>
Possible Implementation
I've taken a crack at this before so I thought it could be helpful. Here is a possible implementation. Obviously, there might be things I may be missing so I would appreciate the feedback or let me know if this even makes sense to have!
suspend inline fun <I, T> Iterable<I>.deferredMap(
coroutineScope: CoroutineScope,
chunkSize: Int? = null,
throwException: Boolean = false,
stopChunkOnException: Boolean = false,
noinline transform: suspend CoroutineScope.(I) -> T,
): Deferred<List<T>> {
val listTransformer: (Iterable<I>) -> List<Deferred<(() -> T)?>> =
if (throwException) {
{ list ->
val items: List<Deferred<() -> T>> =
list.map {
coroutineScope.async {
val transformed = transform(it)
val wrappedTransformed: () -> T = { transformed }
wrappedTransformed
}
}
items
}
} else {
{ list ->
val items: List<Deferred<(() -> T)?>> =
list.map {
coroutineScope.async {
try {
val transformed = transform(it)
val wrappedTransformed: () -> T = { transformed }
wrappedTransformed
} catch (e: Exception) {
null
}
}
}
items
}
}
return this.toAsyncMap(
coroutineScope = coroutineScope,
chunkSize = chunkSize,
catchChunkedException = false,
stopChunkOnException = stopChunkOnException,
wrappedTransformer = listTransformer,
)
}
inline fun <I, T> Iterable<I>.toAsyncMap(
coroutineScope: CoroutineScope,
chunkSize: Int?,
catchChunkedException: Boolean,
stopChunkOnException: Boolean,
crossinline wrappedTransformer: suspend (Iterable<I>) -> List<Deferred<(() -> T)?>>,
): Deferred<List<T>> = if (chunkSize == null) {
coroutineScope.async {
wrappedTransformer(this@toAsyncMap).awaitAll().mapNotNull { it }.map { it() }
}
} else {
coroutineScope.async {
val transformedList = mutableListOf<(() -> T)>()
[email protected](chunkSize)
.toAsyncInOrderMap(
coroutineScope = this,
throwException = catchChunkedException,
stopOnException = stopChunkOnException,
) { portion: List<I> ->
val transformedPortion: List<(() -> T)> = wrappedTransformer(portion).awaitAll().mapNotNull { it }
transformedList.addAll(transformedPortion)
if (portion.size > transformedPortion.size) {
throw IllegalStateException("failed to transform all items in list")
}
transformedPortion
}
transformedList.map { it() }
}
}
suspend inline fun <I, T> Iterable<I>.toAsyncInOrderMap(
coroutineScope: CoroutineScope,
throwException: Boolean = false,
stopOnException: Boolean = false,
crossinline transform: suspend CoroutineScope.(I) -> T,
): Deferred<List<T>> {
return coroutineScope.async {
val destination = ArrayList<T>()
if (throwException) {
for (item in this@toAsyncInOrderMap) {
destination += coroutineScope.transform(item)
}
} else {
for (item in this@toAsyncInOrderMap) {
try {
destination += coroutineScope.transform(item)
} catch (e: Exception) {
if (stopOnException) return@async destination
}
}
}
destination
}
}
The text was updated successfully, but these errors were encountered:
Use case
Let's say you have a list and you want to perform some function asynchronously on and return a list from that. It would be convenient to have a function that lets you map things asynchronously without having to right out all of the logic for creating a list of
Deferred<T>
.You may also want to do async calls in order and then get the result of a transformed list.
The Shape of the API
Possible Implementation
I've taken a crack at this before so I thought it could be helpful. Here is a possible implementation. Obviously, there might be things I may be missing so I would appreciate the feedback or let me know if this even makes sense to have!
The text was updated successfully, but these errors were encountered: