• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.coroutines.flow.*
8 import kotlinx.coroutines.internal.*
9 import java.io.*
10 import java.util.concurrent.*
11 import kotlin.coroutines.*
12 
13 /**
14  * [CoroutineDispatcher] that has underlying [Executor] for dispatching tasks.
15  * Instances of [ExecutorCoroutineDispatcher] should be closed by the owner of the dispatcher.
16  *
17  * This class is generally used as a bridge between coroutine-based API and
18  * asynchronous API that requires an instance of the [Executor].
19  */
20 public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable {
21     /** @suppress */
22     @ExperimentalStdlibApi
23     public companion object Key : AbstractCoroutineContextKey<CoroutineDispatcher, ExecutorCoroutineDispatcher>(
24         CoroutineDispatcher,
<lambda>null25         { it as? ExecutorCoroutineDispatcher })
26 
27     /**
28      * Underlying executor of current [CoroutineDispatcher].
29      */
30     public abstract val executor: Executor
31 
32     /**
33      * Closes this coroutine dispatcher and shuts down its executor.
34      *
35      * It may throw an exception if this dispatcher is global and cannot be closed.
36      */
closenull37     public abstract override fun close()
38 }
39 
40 @ExperimentalCoroutinesApi
41 public actual typealias CloseableCoroutineDispatcher = ExecutorCoroutineDispatcher
42 
43 /**
44  * Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher].
45  *
46  * ## Interaction with [delay] and time-based coroutines.
47  *
48  * If the given [ExecutorService] is an instance of [ScheduledExecutorService], then all time-related
49  * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
50  * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
51  * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
52  *
53  * If the given [ExecutorService] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
54  * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
55  * to reduce the memory pressure of cancelled coroutines.
56  *
57  * If the executor service is neither of this types, the separate internal thread will be used to
58  * _track_ the delay and time-related executions, but the coroutine itself will still be executed
59  * on top of the given executor.
60  *
61  * ## Rejected execution
62  * If the underlying executor throws [RejectedExecutionException] on
63  * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
64  * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
65  * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
66  * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
67  */
68 @JvmName("from") // this is for a nice Java API, see issue #255
69 public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher =
70     ExecutorCoroutineDispatcherImpl(this)
71 
72 /**
73  * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
74  *
75  * ## Interaction with [delay] and time-based coroutines.
76  *
77  * If the given [Executor] is an instance of [ScheduledExecutorService], then all time-related
78  * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
79  * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
80  * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
81  *
82  * If the given [Executor] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
83  * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
84  * to reduce the memory pressure of cancelled coroutines.
85  *
86  * If the executor is neither of this types, the separate internal thread will be used to
87  * _track_ the delay and time-related executions, but the coroutine itself will still be executed
88  * on top of the given executor.
89  *
90  * ## Rejected execution
91  *
92  * If the underlying executor throws [RejectedExecutionException] on
93  * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
94  * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
95  * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
96  * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
97  */
98 @JvmName("from") // this is for a nice Java API, see issue #255
99 public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher =
100     (this as? DispatcherExecutor)?.dispatcher ?: ExecutorCoroutineDispatcherImpl(this)
101 
102 /**
103  * Converts an instance of [CoroutineDispatcher] to an implementation of [Executor].
104  *
105  * It returns the original executor when used on the result of [Executor.asCoroutineDispatcher] extensions.
106  */
107 public fun CoroutineDispatcher.asExecutor(): Executor =
108     (this as? ExecutorCoroutineDispatcher)?.executor ?: DispatcherExecutor(this)
109 
110 private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher) : Executor {
111     override fun execute(block: Runnable) {
112         if (dispatcher.isDispatchNeeded(EmptyCoroutineContext)) {
113             dispatcher.dispatch(EmptyCoroutineContext, block)
114         } else {
115             block.run()
116         }
117     }
118 
119     override fun toString(): String = dispatcher.toString()
120 }
121 
122 internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay {
123 
124     /*
125      * Attempts to reflectively (to be Java 6 compatible) invoke
126      * ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup
127      * internal scheduler queue on cancellation.
128      */
129     init {
130         removeFutureOnCancel(executor)
131     }
132 
dispatchnull133     override fun dispatch(context: CoroutineContext, block: Runnable) {
134         try {
135             executor.execute(wrapTask(block))
136         } catch (e: RejectedExecutionException) {
137             unTrackTask()
138             cancelJobOnRejection(context, e)
139             Dispatchers.IO.dispatch(context, block)
140         }
141     }
142 
scheduleResumeAfterDelaynull143     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
144         val future = (executor as? ScheduledExecutorService)?.scheduleBlock(
145             ResumeUndispatchedRunnable(this, continuation),
146             continuation.context,
147             timeMillis
148         )
149         // If everything went fine and the scheduling attempt was not rejected -- use it
150         if (future != null) {
151             continuation.cancelFutureOnCancellation(future)
152             return
153         }
154         // Otherwise fallback to default executor
155         DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
156     }
157 
invokeOnTimeoutnull158     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
159         val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis)
160         return when {
161             future != null -> DisposableFutureHandle(future)
162             else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context)
163         }
164     }
165 
ScheduledExecutorServicenull166     private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
167         return try {
168             schedule(block, timeMillis, TimeUnit.MILLISECONDS)
169         } catch (e: RejectedExecutionException) {
170             cancelJobOnRejection(context, e)
171             null
172         }
173     }
174 
cancelJobOnRejectionnull175     private fun cancelJobOnRejection(context: CoroutineContext, exception: RejectedExecutionException) {
176         context.cancel(CancellationException("The task was rejected", exception))
177     }
178 
closenull179     override fun close() {
180         (executor as? ExecutorService)?.shutdown()
181     }
182 
toStringnull183     override fun toString(): String = executor.toString()
184     override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherImpl && other.executor === executor
185     override fun hashCode(): Int = System.identityHashCode(executor)
186 }
187 
188 private class ResumeUndispatchedRunnable(
189     private val dispatcher: CoroutineDispatcher,
190     private val continuation: CancellableContinuation<Unit>
191 ) : Runnable {
192     override fun run() {
193         with(continuation) { dispatcher.resumeUndispatched(Unit) }
194     }
195 }
196 
197 /**
198  * An implementation of [DisposableHandle] that cancels the specified future on dispose.
199  * @suppress **This is unstable API and it is subject to change.**
200  */
201 private class DisposableFutureHandle(private val future: Future<*>) : DisposableHandle {
disposenull202     override fun dispose() {
203         future.cancel(false)
204     }
toStringnull205     override fun toString(): String = "DisposableFutureHandle[$future]"
206 }
207