1 /* 2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import kotlinx.coroutines.flow.* 8 import kotlinx.coroutines.internal.* 9 import java.io.* 10 import java.util.concurrent.* 11 import kotlin.coroutines.* 12 13 /** 14 * [CoroutineDispatcher] that has underlying [Executor] for dispatching tasks. 15 * Instances of [ExecutorCoroutineDispatcher] should be closed by the owner of the dispatcher. 16 * 17 * This class is generally used as a bridge between coroutine-based API and 18 * asynchronous API that requires an instance of the [Executor]. 19 */ 20 public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable { 21 /** @suppress */ 22 @ExperimentalStdlibApi 23 public companion object Key : AbstractCoroutineContextKey<CoroutineDispatcher, ExecutorCoroutineDispatcher>( 24 CoroutineDispatcher, <lambda>null25 { it as? ExecutorCoroutineDispatcher }) 26 27 /** 28 * Underlying executor of current [CoroutineDispatcher]. 29 */ 30 public abstract val executor: Executor 31 32 /** 33 * Closes this coroutine dispatcher and shuts down its executor. 34 * 35 * It may throw an exception if this dispatcher is global and cannot be closed. 36 */ closenull37 public abstract override fun close() 38 } 39 40 @ExperimentalCoroutinesApi 41 public actual typealias CloseableCoroutineDispatcher = ExecutorCoroutineDispatcher 42 43 /** 44 * Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher]. 45 * 46 * ## Interaction with [delay] and time-based coroutines. 47 * 48 * If the given [ExecutorService] is an instance of [ScheduledExecutorService], then all time-related 49 * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled 50 * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding 51 * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future. 52 * 53 * If the given [ExecutorService] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling, 54 * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order 55 * to reduce the memory pressure of cancelled coroutines. 56 * 57 * If the executor service is neither of this types, the separate internal thread will be used to 58 * _track_ the delay and time-related executions, but the coroutine itself will still be executed 59 * on top of the given executor. 60 * 61 * ## Rejected execution 62 * If the underlying executor throws [RejectedExecutionException] on 63 * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the 64 * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), 65 * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the 66 * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete. 67 */ 68 @JvmName("from") // this is for a nice Java API, see issue #255 69 public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher = 70 ExecutorCoroutineDispatcherImpl(this) 71 72 /** 73 * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher]. 74 * 75 * ## Interaction with [delay] and time-based coroutines. 76 * 77 * If the given [Executor] is an instance of [ScheduledExecutorService], then all time-related 78 * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled 79 * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding 80 * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future. 81 * 82 * If the given [Executor] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling, 83 * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order 84 * to reduce the memory pressure of cancelled coroutines. 85 * 86 * If the executor is neither of this types, the separate internal thread will be used to 87 * _track_ the delay and time-related executions, but the coroutine itself will still be executed 88 * on top of the given executor. 89 * 90 * ## Rejected execution 91 * 92 * If the underlying executor throws [RejectedExecutionException] on 93 * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the 94 * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), 95 * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the 96 * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete. 97 */ 98 @JvmName("from") // this is for a nice Java API, see issue #255 99 public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher = 100 (this as? DispatcherExecutor)?.dispatcher ?: ExecutorCoroutineDispatcherImpl(this) 101 102 /** 103 * Converts an instance of [CoroutineDispatcher] to an implementation of [Executor]. 104 * 105 * It returns the original executor when used on the result of [Executor.asCoroutineDispatcher] extensions. 106 */ 107 public fun CoroutineDispatcher.asExecutor(): Executor = 108 (this as? ExecutorCoroutineDispatcher)?.executor ?: DispatcherExecutor(this) 109 110 private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher) : Executor { 111 override fun execute(block: Runnable) = dispatcher.dispatch(EmptyCoroutineContext, block) 112 override fun toString(): String = dispatcher.toString() 113 } 114 115 internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay { 116 117 /* 118 * Attempts to reflectively (to be Java 6 compatible) invoke 119 * ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup 120 * internal scheduler queue on cancellation. 121 */ 122 init { 123 removeFutureOnCancel(executor) 124 } 125 dispatchnull126 override fun dispatch(context: CoroutineContext, block: Runnable) { 127 try { 128 executor.execute(wrapTask(block)) 129 } catch (e: RejectedExecutionException) { 130 unTrackTask() 131 cancelJobOnRejection(context, e) 132 Dispatchers.IO.dispatch(context, block) 133 } 134 } 135 scheduleResumeAfterDelaynull136 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { 137 val future = (executor as? ScheduledExecutorService)?.scheduleBlock( 138 ResumeUndispatchedRunnable(this, continuation), 139 continuation.context, 140 timeMillis 141 ) 142 // If everything went fine and the scheduling attempt was not rejected -- use it 143 if (future != null) { 144 continuation.cancelFutureOnCancellation(future) 145 return 146 } 147 // Otherwise fallback to default executor 148 DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation) 149 } 150 invokeOnTimeoutnull151 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { 152 val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis) 153 return when { 154 future != null -> DisposableFutureHandle(future) 155 else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context) 156 } 157 } 158 ScheduledExecutorServicenull159 private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? { 160 return try { 161 schedule(block, timeMillis, TimeUnit.MILLISECONDS) 162 } catch (e: RejectedExecutionException) { 163 cancelJobOnRejection(context, e) 164 null 165 } 166 } 167 cancelJobOnRejectionnull168 private fun cancelJobOnRejection(context: CoroutineContext, exception: RejectedExecutionException) { 169 context.cancel(CancellationException("The task was rejected", exception)) 170 } 171 closenull172 override fun close() { 173 (executor as? ExecutorService)?.shutdown() 174 } 175 toStringnull176 override fun toString(): String = executor.toString() 177 override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherImpl && other.executor === executor 178 override fun hashCode(): Int = System.identityHashCode(executor) 179 } 180 181 private class ResumeUndispatchedRunnable( 182 private val dispatcher: CoroutineDispatcher, 183 private val continuation: CancellableContinuation<Unit> 184 ) : Runnable { 185 override fun run() { 186 with(continuation) { dispatcher.resumeUndispatched(Unit) } 187 } 188 } 189 190 /** 191 * An implementation of [DisposableHandle] that cancels the specified future on dispose. 192 * @suppress **This is unstable API and it is subject to change.** 193 */ 194 private class DisposableFutureHandle(private val future: Future<*>) : DisposableHandle { disposenull195 override fun dispose() { 196 future.cancel(false) 197 } toStringnull198 override fun toString(): String = "DisposableFutureHandle[$future]" 199 } 200