• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.coroutines.internal.*
8 import java.io.*
9 import java.util.concurrent.*
10 import kotlin.coroutines.*
11 
12 /**
13  * [CoroutineDispatcher] that has underlying [Executor] for dispatching tasks.
14  * Instances of [ExecutorCoroutineDispatcher] should be closed by the owner of the dispatcher.
15  *
16  * This class is generally used as a bridge between coroutine-based API and
17  * asynchronous API that requires an instance of the [Executor].
18  */
19 public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable {
20     /** @suppress */
21     @ExperimentalStdlibApi
22     public companion object Key : AbstractCoroutineContextKey<CoroutineDispatcher, ExecutorCoroutineDispatcher>(
23         CoroutineDispatcher,
<lambda>null24         { it as? ExecutorCoroutineDispatcher })
25 
26     /**
27      * Underlying executor of current [CoroutineDispatcher].
28      */
29     public abstract val executor: Executor
30 
31     /**
32      * Closes this coroutine dispatcher and shuts down its executor.
33      *
34      * It may throw an exception if this dispatcher is global and cannot be closed.
35      */
closenull36     public abstract override fun close()
37 }
38 
39 /**
40  * Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher].
41  *
42  * If the underlying executor throws [RejectedExecutionException] on
43  * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
44  * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
45  * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
46  * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
47  */
48 @JvmName("from") // this is for a nice Java API, see issue #255
49 public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher =
50     ExecutorCoroutineDispatcherImpl(this)
51 
52 /**
53  * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
54  *
55  * If the underlying executor throws [RejectedExecutionException] on
56  * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
57  * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
58  * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
59  * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
60  */
61 @JvmName("from") // this is for a nice Java API, see issue #255
62 public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher =
63     (this as? DispatcherExecutor)?.dispatcher ?: ExecutorCoroutineDispatcherImpl(this)
64 
65 /**
66  * Converts an instance of [CoroutineDispatcher] to an implementation of [Executor].
67  *
68  * It returns the original executor when used on the result of [Executor.asCoroutineDispatcher] extensions.
69  */
70 public fun CoroutineDispatcher.asExecutor(): Executor =
71     (this as? ExecutorCoroutineDispatcher)?.executor ?: DispatcherExecutor(this)
72 
73 private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher) : Executor {
74     override fun execute(block: Runnable) = dispatcher.dispatch(EmptyCoroutineContext, block)
75     override fun toString(): String = dispatcher.toString()
76 }
77 
78 private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase() {
79     init {
80         initFutureCancellation()
81     }
82 }
83 
84 internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {
85 
86     private var removesFutureOnCancellation: Boolean = false
87 
initFutureCancellationnull88     internal fun initFutureCancellation() {
89         removesFutureOnCancellation = removeFutureOnCancel(executor)
90     }
91 
dispatchnull92     override fun dispatch(context: CoroutineContext, block: Runnable) {
93         try {
94             executor.execute(wrapTask(block))
95         } catch (e: RejectedExecutionException) {
96             unTrackTask()
97             cancelJobOnRejection(context, e)
98             Dispatchers.IO.dispatch(context, block)
99         }
100     }
101 
102     /*
103      * removesFutureOnCancellation is required to avoid memory leak.
104      * On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine.
105      * On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation.
106      */
scheduleResumeAfterDelaynull107     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
108         val future = if (removesFutureOnCancellation) {
109             scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis)
110         } else {
111             null
112         }
113         // If everything went fine and the scheduling attempt was not rejected -- use it
114         if (future != null) {
115             continuation.cancelFutureOnCancellation(future)
116             return
117         }
118         // Otherwise fallback to default executor
119         DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
120     }
121 
invokeOnTimeoutnull122     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
123         val future = if (removesFutureOnCancellation) {
124             scheduleBlock(block, context, timeMillis)
125         } else {
126             null
127         }
128         return when {
129             future != null -> DisposableFutureHandle(future)
130             else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context)
131         }
132     }
133 
scheduleBlocknull134     private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
135         return try {
136             (executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS)
137         } catch (e: RejectedExecutionException) {
138             cancelJobOnRejection(context, e)
139             null
140         }
141     }
142 
cancelJobOnRejectionnull143     private fun cancelJobOnRejection(context: CoroutineContext, exception: RejectedExecutionException) {
144         context.cancel(CancellationException("The task was rejected", exception))
145     }
146 
closenull147     override fun close() {
148         (executor as? ExecutorService)?.shutdown()
149     }
150 
toStringnull151     override fun toString(): String = executor.toString()
152     override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor
153     override fun hashCode(): Int = System.identityHashCode(executor)
154 }
155 
156 private class ResumeUndispatchedRunnable(
157     private val dispatcher: CoroutineDispatcher,
158     private val continuation: CancellableContinuation<Unit>
159 ) : Runnable {
160     override fun run() {
161         with(continuation) { dispatcher.resumeUndispatched(Unit) }
162     }
163 }
164 
165 /**
166  * An implementation of [DisposableHandle] that cancels the specified future on dispose.
167  * @suppress **This is unstable API and it is subject to change.**
168  */
169 private class DisposableFutureHandle(private val future: Future<*>) : DisposableHandle {
disposenull170     override fun dispose() {
171         future.cancel(false)
172     }
toStringnull173     override fun toString(): String = "DisposableFutureHandle[$future]"
174 }
175