1 /* 2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import kotlinx.coroutines.flow.* 8 import kotlinx.coroutines.internal.* 9 import java.io.* 10 import java.util.concurrent.* 11 import kotlin.coroutines.* 12 13 /** 14 * [CoroutineDispatcher] that has underlying [Executor] for dispatching tasks. 15 * Instances of [ExecutorCoroutineDispatcher] should be closed by the owner of the dispatcher. 16 * 17 * This class is generally used as a bridge between coroutine-based API and 18 * asynchronous API that requires an instance of the [Executor]. 19 */ 20 public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable { 21 /** @suppress */ 22 @ExperimentalStdlibApi 23 public companion object Key : AbstractCoroutineContextKey<CoroutineDispatcher, ExecutorCoroutineDispatcher>( 24 CoroutineDispatcher, <lambda>null25 { it as? ExecutorCoroutineDispatcher }) 26 27 /** 28 * Underlying executor of current [CoroutineDispatcher]. 29 */ 30 public abstract val executor: Executor 31 32 /** 33 * Closes this coroutine dispatcher and shuts down its executor. 34 * 35 * It may throw an exception if this dispatcher is global and cannot be closed. 36 */ closenull37 public abstract override fun close() 38 } 39 40 @ExperimentalCoroutinesApi 41 public actual typealias CloseableCoroutineDispatcher = ExecutorCoroutineDispatcher 42 43 /** 44 * Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher]. 45 * 46 * ## Interaction with [delay] and time-based coroutines. 47 * 48 * If the given [ExecutorService] is an instance of [ScheduledExecutorService], then all time-related 49 * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled 50 * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding 51 * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future. 52 * 53 * If the given [ExecutorService] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling, 54 * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order 55 * to reduce the memory pressure of cancelled coroutines. 56 * 57 * If the executor service is neither of this types, the separate internal thread will be used to 58 * _track_ the delay and time-related executions, but the coroutine itself will still be executed 59 * on top of the given executor. 60 * 61 * ## Rejected execution 62 * If the underlying executor throws [RejectedExecutionException] on 63 * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the 64 * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), 65 * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the 66 * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete. 67 */ 68 @JvmName("from") // this is for a nice Java API, see issue #255 69 public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher = 70 ExecutorCoroutineDispatcherImpl(this) 71 72 /** 73 * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher]. 74 * 75 * ## Interaction with [delay] and time-based coroutines. 76 * 77 * If the given [Executor] is an instance of [ScheduledExecutorService], then all time-related 78 * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled 79 * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding 80 * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future. 81 * 82 * If the given [Executor] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling, 83 * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order 84 * to reduce the memory pressure of cancelled coroutines. 85 * 86 * If the executor is neither of this types, the separate internal thread will be used to 87 * _track_ the delay and time-related executions, but the coroutine itself will still be executed 88 * on top of the given executor. 89 * 90 * ## Rejected execution 91 * 92 * If the underlying executor throws [RejectedExecutionException] on 93 * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the 94 * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), 95 * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the 96 * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete. 97 */ 98 @JvmName("from") // this is for a nice Java API, see issue #255 99 public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher = 100 (this as? DispatcherExecutor)?.dispatcher ?: ExecutorCoroutineDispatcherImpl(this) 101 102 /** 103 * Converts an instance of [CoroutineDispatcher] to an implementation of [Executor]. 104 * 105 * It returns the original executor when used on the result of [Executor.asCoroutineDispatcher] extensions. 106 */ 107 public fun CoroutineDispatcher.asExecutor(): Executor = 108 (this as? ExecutorCoroutineDispatcher)?.executor ?: DispatcherExecutor(this) 109 110 private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher) : Executor { 111 override fun execute(block: Runnable) { 112 if (dispatcher.isDispatchNeeded(EmptyCoroutineContext)) { 113 dispatcher.dispatch(EmptyCoroutineContext, block) 114 } else { 115 block.run() 116 } 117 } 118 119 override fun toString(): String = dispatcher.toString() 120 } 121 122 internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay { 123 124 /* 125 * Attempts to reflectively (to be Java 6 compatible) invoke 126 * ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup 127 * internal scheduler queue on cancellation. 128 */ 129 init { 130 removeFutureOnCancel(executor) 131 } 132 dispatchnull133 override fun dispatch(context: CoroutineContext, block: Runnable) { 134 try { 135 executor.execute(wrapTask(block)) 136 } catch (e: RejectedExecutionException) { 137 unTrackTask() 138 cancelJobOnRejection(context, e) 139 Dispatchers.IO.dispatch(context, block) 140 } 141 } 142 scheduleResumeAfterDelaynull143 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { 144 val future = (executor as? ScheduledExecutorService)?.scheduleBlock( 145 ResumeUndispatchedRunnable(this, continuation), 146 continuation.context, 147 timeMillis 148 ) 149 // If everything went fine and the scheduling attempt was not rejected -- use it 150 if (future != null) { 151 continuation.cancelFutureOnCancellation(future) 152 return 153 } 154 // Otherwise fallback to default executor 155 DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation) 156 } 157 invokeOnTimeoutnull158 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { 159 val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis) 160 return when { 161 future != null -> DisposableFutureHandle(future) 162 else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context) 163 } 164 } 165 ScheduledExecutorServicenull166 private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? { 167 return try { 168 schedule(block, timeMillis, TimeUnit.MILLISECONDS) 169 } catch (e: RejectedExecutionException) { 170 cancelJobOnRejection(context, e) 171 null 172 } 173 } 174 cancelJobOnRejectionnull175 private fun cancelJobOnRejection(context: CoroutineContext, exception: RejectedExecutionException) { 176 context.cancel(CancellationException("The task was rejected", exception)) 177 } 178 closenull179 override fun close() { 180 (executor as? ExecutorService)?.shutdown() 181 } 182 toStringnull183 override fun toString(): String = executor.toString() 184 override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherImpl && other.executor === executor 185 override fun hashCode(): Int = System.identityHashCode(executor) 186 } 187 188 private class ResumeUndispatchedRunnable( 189 private val dispatcher: CoroutineDispatcher, 190 private val continuation: CancellableContinuation<Unit> 191 ) : Runnable { 192 override fun run() { 193 with(continuation) { dispatcher.resumeUndispatched(Unit) } 194 } 195 } 196 197 /** 198 * An implementation of [DisposableHandle] that cancels the specified future on dispose. 199 * @suppress **This is unstable API and it is subject to change.** 200 */ 201 private class DisposableFutureHandle(private val future: Future<*>) : DisposableHandle { disposenull202 override fun dispose() { 203 future.cancel(false) 204 } toStringnull205 override fun toString(): String = "DisposableFutureHandle[$future]" 206 } 207