|
| 1 | +package org.oppia.android.scripts.common |
| 2 | + |
| 3 | +import kotlinx.coroutines.CoroutineDispatcher |
| 4 | +import kotlinx.coroutines.Runnable |
| 5 | +import kotlinx.coroutines.asCoroutineDispatcher |
| 6 | +import java.io.Closeable |
| 7 | +import java.util.concurrent.ExecutorService |
| 8 | +import java.util.concurrent.Executors |
| 9 | +import java.util.concurrent.TimeUnit |
| 10 | +import kotlin.coroutines.CoroutineContext |
| 11 | + |
| 12 | +/** |
| 13 | + * A [CoroutineDispatcher] that's [Closeable] and particularly tailored to be easily used in scripts |
| 14 | + * that need to perform parallel tasks for expensive IO. It's highly recommended to exclusively use |
| 15 | + * this dispatcher over any others, and to ensure that [close] is called at the end of the script to |
| 16 | + * avoid any potential threads hanging (causing the script to not actually close). |
| 17 | + * |
| 18 | + * Note that the dispatcher attempts to finish any ongoing tasks when [close] is called, but it will |
| 19 | + * reject new tasks from being scheduled and it will force terminate if any pending tasks at the |
| 20 | + * time of closing don't end within the configured [closeTimeout] provided. |
| 21 | + * |
| 22 | + * A simple example for using this dispatcher: |
| 23 | + * ```kotlin |
| 24 | + * ScriptBackgroundCoroutineDispatcher().use { scriptBgDispatcher -> |
| 25 | + * val deferred = CoroutineScope(scriptBgDispatcher).async { |
| 26 | + * // Expensive task... |
| 27 | + * } |
| 28 | + * // IMPORTANT: The operation must be observed before use{} ends, otherwise the dispatcher will |
| 29 | + * // close and terminate any pending tasks. |
| 30 | + * runBlocking { deferred.await() } |
| 31 | + * } |
| 32 | + * ``` |
| 33 | + * |
| 34 | + * A more complex example for I/O operations: |
| 35 | + * ```kotlin |
| 36 | + * ScriptBackgroundCoroutineDispatcher().use { scriptBgDispatcher -> |
| 37 | + * val deferred = CoroutineScope(scriptBgDispatcher).async { |
| 38 | + * withContext(Dispatchers.IO) { |
| 39 | + * // Perform I/O using Kotlin's highly parallelized I/O dispatcher, but wait for the result |
| 40 | + * // using the background script dispatcher (since execution could continue if other I/O |
| 41 | + * // operations need to be kicked off, or if other work can be done alongside the I/O). |
| 42 | + * } |
| 43 | + * } |
| 44 | + * // IMPORTANT: The operation must be observed before use{} ends, otherwise the dispatcher will |
| 45 | + * // close and terminate any pending tasks. |
| 46 | + * runBlocking { deferred.await() } |
| 47 | + * } |
| 48 | + * ``` |
| 49 | + * |
| 50 | + * @property closeTimeout the amount of time, in [closeTimeoutUnit] units, that should be waited |
| 51 | + * when [close]ing this dispatcher before force-ending ongoing tasks |
| 52 | + * @property closeTimeoutUnit the unit of time used for [closeTimeout] |
| 53 | + */ |
| 54 | +class ScriptBackgroundCoroutineDispatcher( |
| 55 | + private val closeTimeout: Long = 5, |
| 56 | + private val closeTimeoutUnit: TimeUnit = TimeUnit.SECONDS |
| 57 | +) : CoroutineDispatcher(), Closeable { |
| 58 | + private val threadPool by lazy { Executors.newCachedThreadPool() } |
| 59 | + private val coroutineDispatcher by lazy { threadPool.asCoroutineDispatcher() } |
| 60 | + |
| 61 | + override fun dispatch(context: CoroutineContext, block: Runnable) { |
| 62 | + coroutineDispatcher.dispatch(context, block) |
| 63 | + } |
| 64 | + |
| 65 | + override fun close() { |
| 66 | + threadPool.tryShutdownFully(timeout = closeTimeout, unit = closeTimeoutUnit) |
| 67 | + coroutineDispatcher.close() |
| 68 | + } |
| 69 | + |
| 70 | + private companion object { |
| 71 | + private fun ExecutorService.tryShutdownFully(timeout: Long, unit: TimeUnit) { |
| 72 | + // Try to fully shutdown the executor service per https://stackoverflow.com/a/33690603 and |
| 73 | + // https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html. |
| 74 | + shutdown() |
| 75 | + try { |
| 76 | + if (!awaitTermination(timeout, unit)) { |
| 77 | + shutdownNow() |
| 78 | + check(awaitTermination(timeout, unit)) { "ExecutorService didn't fully shutdown: $this." } |
| 79 | + } |
| 80 | + } catch (e: InterruptedException) { |
| 81 | + shutdownNow() |
| 82 | + Thread.currentThread().interrupt() |
| 83 | + } |
| 84 | + } |
| 85 | + } |
| 86 | +} |
0 commit comments