1 /* 2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import kotlinx.coroutines.internal.* 8 import java.io.* 9 import java.util.concurrent.* 10 import kotlin.coroutines.* 11 12 /** 13 * [CoroutineDispatcher] that has underlying [Executor] for dispatching tasks. 14 * Instances of [ExecutorCoroutineDispatcher] should be closed by the owner of the dispatcher. 15 * 16 * This class is generally used as a bridge between coroutine-based API and 17 * asynchronous API that requires an instance of the [Executor]. 18 */ 19 public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable { 20 /** @suppress */ 21 @ExperimentalStdlibApi 22 public companion object Key : AbstractCoroutineContextKey<CoroutineDispatcher, ExecutorCoroutineDispatcher>( 23 CoroutineDispatcher, <lambda>null24 { it as? ExecutorCoroutineDispatcher }) 25 26 /** 27 * Underlying executor of current [CoroutineDispatcher]. 28 */ 29 public abstract val executor: Executor 30 31 /** 32 * Closes this coroutine dispatcher and shuts down its executor. 33 * 34 * It may throw an exception if this dispatcher is global and cannot be closed. 35 */ closenull36 public abstract override fun close() 37 } 38 39 /** 40 * Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher]. 41 * 42 * If the underlying executor throws [RejectedExecutionException] on 43 * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the 44 * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), 45 * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the 46 * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete. 47 */ 48 @JvmName("from") // this is for a nice Java API, see issue #255 49 public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher = 50 ExecutorCoroutineDispatcherImpl(this) 51 52 /** 53 * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher]. 54 * 55 * If the underlying executor throws [RejectedExecutionException] on 56 * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the 57 * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), 58 * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the 59 * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete. 60 */ 61 @JvmName("from") // this is for a nice Java API, see issue #255 62 public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher = 63 (this as? DispatcherExecutor)?.dispatcher ?: ExecutorCoroutineDispatcherImpl(this) 64 65 /** 66 * Converts an instance of [CoroutineDispatcher] to an implementation of [Executor]. 67 * 68 * It returns the original executor when used on the result of [Executor.asCoroutineDispatcher] extensions. 69 */ 70 public fun CoroutineDispatcher.asExecutor(): Executor = 71 (this as? ExecutorCoroutineDispatcher)?.executor ?: DispatcherExecutor(this) 72 73 private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher) : Executor { 74 override fun execute(block: Runnable) = dispatcher.dispatch(EmptyCoroutineContext, block) 75 override fun toString(): String = dispatcher.toString() 76 } 77 78 private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase() { 79 init { 80 initFutureCancellation() 81 } 82 } 83 84 internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay { 85 86 private var removesFutureOnCancellation: Boolean = false 87 initFutureCancellationnull88 internal fun initFutureCancellation() { 89 removesFutureOnCancellation = removeFutureOnCancel(executor) 90 } 91 dispatchnull92 override fun dispatch(context: CoroutineContext, block: Runnable) { 93 try { 94 executor.execute(wrapTask(block)) 95 } catch (e: RejectedExecutionException) { 96 unTrackTask() 97 cancelJobOnRejection(context, e) 98 Dispatchers.IO.dispatch(context, block) 99 } 100 } 101 102 /* 103 * removesFutureOnCancellation is required to avoid memory leak. 104 * On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine. 105 * On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation. 106 */ scheduleResumeAfterDelaynull107 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { 108 val future = if (removesFutureOnCancellation) { 109 scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis) 110 } else { 111 null 112 } 113 // If everything went fine and the scheduling attempt was not rejected -- use it 114 if (future != null) { 115 continuation.cancelFutureOnCancellation(future) 116 return 117 } 118 // Otherwise fallback to default executor 119 DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation) 120 } 121 invokeOnTimeoutnull122 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { 123 val future = if (removesFutureOnCancellation) { 124 scheduleBlock(block, context, timeMillis) 125 } else { 126 null 127 } 128 return when { 129 future != null -> DisposableFutureHandle(future) 130 else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context) 131 } 132 } 133 scheduleBlocknull134 private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? { 135 return try { 136 (executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS) 137 } catch (e: RejectedExecutionException) { 138 cancelJobOnRejection(context, e) 139 null 140 } 141 } 142 cancelJobOnRejectionnull143 private fun cancelJobOnRejection(context: CoroutineContext, exception: RejectedExecutionException) { 144 context.cancel(CancellationException("The task was rejected", exception)) 145 } 146 closenull147 override fun close() { 148 (executor as? ExecutorService)?.shutdown() 149 } 150 toStringnull151 override fun toString(): String = executor.toString() 152 override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor 153 override fun hashCode(): Int = System.identityHashCode(executor) 154 } 155 156 private class ResumeUndispatchedRunnable( 157 private val dispatcher: CoroutineDispatcher, 158 private val continuation: CancellableContinuation<Unit> 159 ) : Runnable { 160 override fun run() { 161 with(continuation) { dispatcher.resumeUndispatched(Unit) } 162 } 163 } 164 165 /** 166 * An implementation of [DisposableHandle] that cancels the specified future on dispose. 167 * @suppress **This is unstable API and it is subject to change.** 168 */ 169 private class DisposableFutureHandle(private val future: Future<*>) : DisposableHandle { disposenull170 override fun dispose() { 171 future.cancel(false) 172 } toStringnull173 override fun toString(): String = "DisposableFutureHandle[$future]" 174 } 175