Skip to content

Mapping a list asynchronously #4221

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
singhmanu opened this issue Sep 7, 2024 · 2 comments
Closed

Mapping a list asynchronously #4221

singhmanu opened this issue Sep 7, 2024 · 2 comments

Comments

@singhmanu
Copy link

Use case

Let's say you have a list and you want to perform some function asynchronously on and return a list from that. It would be convenient to have a function that lets you map things asynchronously without having to right out all of the logic for creating a list of Deferred<T>.

You may also want to do async calls in order and then get the result of a transformed list.

// inside a coroutine
val list: List<Int> = (0..5).toList()
val transformedMap = list.deferredMap(Dispatchers.IO) {
  delay(100L - it * 10L)
  println(it)
}
transformMap.await()
/* output
5
4
3
2
1
0
*/

The Shape of the API

/**
 * Map a list of items to a list of deferred items. The deferred items are then awaited and the resulting list is returned.
 * @param coroutineScope The coroutine scope to use for the async operations
 * @param chunkSize the number of coroutines to run at the same time asynchronously. The iterable is taken and split into chunks of this amount
 * @param throwException if true, when any exception happens in any transform, the exception will be thrown
 * @param stopChunkOnException if true, when any exception happens in any transform, the remaining chunks will not be processed
 * @param transform the function to transform each item
 * @return the list of deferred transformed items
 * @throws Exception if any exception happens in any transform and throwException is true
 */
suspend fun <I, T> Iterable<I>.deferredMap(
    coroutineScope: CoroutineScope,
    chunkSize: Int? = null,
    throwException: Boolean = false,
    stopChunkOnException: Boolean = false,
    noinline transform: suspend CoroutineScope.(I) -> T,
): Deferred<List<T>>

/**
 * Map a list of items to a list of transformed items using coroutines in order
 * @param coroutineScope the scope in which to run the coroutines
 * @param throwException if true, when any exception happens in any transform, the exception will be thrown
 * @param stopOnException if true, when any exception happens in any transform, the rest of the remaining transforms will not be executed
 * @param transform the function to transform each item
 * @return the list of transformed items
 */
suspend inline fun <I, T> Iterable<I>.toAsyncInOrderMap(
    coroutineScope: CoroutineScope,
    throwException: Boolean = false,
    stopOnException: Boolean = false,
    crossinline transform: suspend CoroutineScope.(I) -> T,
): Deferred<List<T>>

Possible Implementation

I've taken a crack at this before so I thought it could be helpful. Here is a possible implementation. Obviously, there might be things I may be missing so I would appreciate the feedback or let me know if this even makes sense to have!

suspend inline fun <I, T> Iterable<I>.deferredMap(
    coroutineScope: CoroutineScope,
    chunkSize: Int? = null,
    throwException: Boolean = false,
    stopChunkOnException: Boolean = false,
    noinline transform: suspend CoroutineScope.(I) -> T,
): Deferred<List<T>> {
    val listTransformer: (Iterable<I>) -> List<Deferred<(() -> T)?>> =
        if (throwException) {
            { list ->
                val items: List<Deferred<() -> T>> =
                    list.map {
                        coroutineScope.async {
                            val transformed = transform(it)
                            val wrappedTransformed: () -> T = { transformed }
                            wrappedTransformed
                        }
                    }
                items
            }
        } else {
            { list ->
                val items: List<Deferred<(() -> T)?>> =
                    list.map {
                        coroutineScope.async {
                            try {
                                val transformed = transform(it)
                                val wrappedTransformed: () -> T = { transformed }
                                wrappedTransformed
                            } catch (e: Exception) {
                                null
                            }
                        }
                    }
                items
            }
        }
    return this.toAsyncMap(
        coroutineScope = coroutineScope,
        chunkSize = chunkSize,
        catchChunkedException = false,
        stopChunkOnException = stopChunkOnException,
        wrappedTransformer = listTransformer,
    )
}

inline fun <I, T> Iterable<I>.toAsyncMap(
    coroutineScope: CoroutineScope,
    chunkSize: Int?,
    catchChunkedException: Boolean,
    stopChunkOnException: Boolean,
    crossinline wrappedTransformer: suspend (Iterable<I>) -> List<Deferred<(() -> T)?>>,
): Deferred<List<T>> = if (chunkSize == null) {
    coroutineScope.async {
        wrappedTransformer(this@toAsyncMap).awaitAll().mapNotNull { it }.map { it() }
    }
} else {
    coroutineScope.async {
        val transformedList = mutableListOf<(() -> T)>()
        [email protected](chunkSize)
            .toAsyncInOrderMap(
                coroutineScope = this,
                throwException = catchChunkedException,
                stopOnException = stopChunkOnException,
            ) { portion: List<I> ->
                val transformedPortion: List<(() -> T)> = wrappedTransformer(portion).awaitAll().mapNotNull { it }
                transformedList.addAll(transformedPortion)
                if (portion.size > transformedPortion.size) {
                    throw IllegalStateException("failed to transform all items in list")
                }
                transformedPortion
            }
        transformedList.map { it() }
    }
}

suspend inline fun <I, T> Iterable<I>.toAsyncInOrderMap(
    coroutineScope: CoroutineScope,
    throwException: Boolean = false,
    stopOnException: Boolean = false,
    crossinline transform: suspend CoroutineScope.(I) -> T,
): Deferred<List<T>> {
    return coroutineScope.async {
        val destination = ArrayList<T>()
        if (throwException) {
            for (item in this@toAsyncInOrderMap) {
                destination += coroutineScope.transform(item)
            }
        } else {
            for (item in this@toAsyncInOrderMap) {
                try {
                    destination += coroutineScope.transform(item)
                } catch (e: Exception) {
                    if (stopOnException) return@async destination
                }
            }
        }
        destination
    }
}
@JakeWharton
Copy link
Contributor

This has been discussed in #1147, under the assumption that you'd turn the list into a Flow, map in parallel, and then reconstitute a list.

@singhmanu
Copy link
Author

Thanks. Ill take a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants