• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.coroutines.flow.*
8 import kotlinx.coroutines.internal.*
9 import java.io.*
10 import java.util.concurrent.*
11 import kotlin.coroutines.*
12 
13 /**
14  * [CoroutineDispatcher] that has underlying [Executor] for dispatching tasks.
15  * Instances of [ExecutorCoroutineDispatcher] should be closed by the owner of the dispatcher.
16  *
17  * This class is generally used as a bridge between coroutine-based API and
18  * asynchronous API that requires an instance of the [Executor].
19  */
20 public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable {
21     /** @suppress */
22     @ExperimentalStdlibApi
23     public companion object Key : AbstractCoroutineContextKey<CoroutineDispatcher, ExecutorCoroutineDispatcher>(
24         CoroutineDispatcher,
<lambda>null25         { it as? ExecutorCoroutineDispatcher })
26 
27     /**
28      * Underlying executor of current [CoroutineDispatcher].
29      */
30     public abstract val executor: Executor
31 
32     /**
33      * Closes this coroutine dispatcher and shuts down its executor.
34      *
35      * It may throw an exception if this dispatcher is global and cannot be closed.
36      */
closenull37     public abstract override fun close()
38 }
39 
40 @ExperimentalCoroutinesApi
41 public actual typealias CloseableCoroutineDispatcher = ExecutorCoroutineDispatcher
42 
43 /**
44  * Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher].
45  *
46  * ## Interaction with [delay] and time-based coroutines.
47  *
48  * If the given [ExecutorService] is an instance of [ScheduledExecutorService], then all time-related
49  * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
50  * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
51  * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
52  *
53  * If the given [ExecutorService] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
54  * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
55  * to reduce the memory pressure of cancelled coroutines.
56  *
57  * If the executor service is neither of this types, the separate internal thread will be used to
58  * _track_ the delay and time-related executions, but the coroutine itself will still be executed
59  * on top of the given executor.
60  *
61  * ## Rejected execution
62  * If the underlying executor throws [RejectedExecutionException] on
63  * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
64  * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
65  * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
66  * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
67  */
68 @JvmName("from") // this is for a nice Java API, see issue #255
69 public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher =
70     ExecutorCoroutineDispatcherImpl(this)
71 
72 /**
73  * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
74  *
75  * ## Interaction with [delay] and time-based coroutines.
76  *
77  * If the given [Executor] is an instance of [ScheduledExecutorService], then all time-related
78  * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
79  * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
80  * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
81  *
82  * If the given [Executor] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
83  * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
84  * to reduce the memory pressure of cancelled coroutines.
85  *
86  * If the executor is neither of this types, the separate internal thread will be used to
87  * _track_ the delay and time-related executions, but the coroutine itself will still be executed
88  * on top of the given executor.
89  *
90  * ## Rejected execution
91  *
92  * If the underlying executor throws [RejectedExecutionException] on
93  * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
94  * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
95  * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
96  * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
97  */
98 @JvmName("from") // this is for a nice Java API, see issue #255
99 public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher =
100     (this as? DispatcherExecutor)?.dispatcher ?: ExecutorCoroutineDispatcherImpl(this)
101 
102 /**
103  * Converts an instance of [CoroutineDispatcher] to an implementation of [Executor].
104  *
105  * It returns the original executor when used on the result of [Executor.asCoroutineDispatcher] extensions.
106  */
107 public fun CoroutineDispatcher.asExecutor(): Executor =
108     (this as? ExecutorCoroutineDispatcher)?.executor ?: DispatcherExecutor(this)
109 
110 private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher) : Executor {
111     override fun execute(block: Runnable) = dispatcher.dispatch(EmptyCoroutineContext, block)
112     override fun toString(): String = dispatcher.toString()
113 }
114 
115 internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay {
116 
117     /*
118      * Attempts to reflectively (to be Java 6 compatible) invoke
119      * ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup
120      * internal scheduler queue on cancellation.
121      */
122     init {
123         removeFutureOnCancel(executor)
124     }
125 
dispatchnull126     override fun dispatch(context: CoroutineContext, block: Runnable) {
127         try {
128             executor.execute(wrapTask(block))
129         } catch (e: RejectedExecutionException) {
130             unTrackTask()
131             cancelJobOnRejection(context, e)
132             Dispatchers.IO.dispatch(context, block)
133         }
134     }
135 
scheduleResumeAfterDelaynull136     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
137         val future = (executor as? ScheduledExecutorService)?.scheduleBlock(
138             ResumeUndispatchedRunnable(this, continuation),
139             continuation.context,
140             timeMillis
141         )
142         // If everything went fine and the scheduling attempt was not rejected -- use it
143         if (future != null) {
144             continuation.cancelFutureOnCancellation(future)
145             return
146         }
147         // Otherwise fallback to default executor
148         DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
149     }
150 
invokeOnTimeoutnull151     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
152         val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis)
153         return when {
154             future != null -> DisposableFutureHandle(future)
155             else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context)
156         }
157     }
158 
ScheduledExecutorServicenull159     private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
160         return try {
161             schedule(block, timeMillis, TimeUnit.MILLISECONDS)
162         } catch (e: RejectedExecutionException) {
163             cancelJobOnRejection(context, e)
164             null
165         }
166     }
167 
cancelJobOnRejectionnull168     private fun cancelJobOnRejection(context: CoroutineContext, exception: RejectedExecutionException) {
169         context.cancel(CancellationException("The task was rejected", exception))
170     }
171 
closenull172     override fun close() {
173         (executor as? ExecutorService)?.shutdown()
174     }
175 
toStringnull176     override fun toString(): String = executor.toString()
177     override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherImpl && other.executor === executor
178     override fun hashCode(): Int = System.identityHashCode(executor)
179 }
180 
181 private class ResumeUndispatchedRunnable(
182     private val dispatcher: CoroutineDispatcher,
183     private val continuation: CancellableContinuation<Unit>
184 ) : Runnable {
185     override fun run() {
186         with(continuation) { dispatcher.resumeUndispatched(Unit) }
187     }
188 }
189 
190 /**
191  * An implementation of [DisposableHandle] that cancels the specified future on dispose.
192  * @suppress **This is unstable API and it is subject to change.**
193  */
194 private class DisposableFutureHandle(private val future: Future<*>) : DisposableHandle {
disposenull195     override fun dispose() {
196         future.cancel(false)
197     }
toStringnull198     override fun toString(): String = "DisposableFutureHandle[$future]"
199 }
200