• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.atomicfu.*
8 import kotlin.coroutines.*
9 
10 /**
11  * Calls the specified [block] with a given coroutine context in an interruptible manner.
12  * The blocking code block will be interrupted and this function will throw [CancellationException]
13  * if the coroutine is cancelled.
14  *
15  * Example:
16  *
17  * ```
18  * withTimeout(500L) {            // Cancels coroutine on timeout
19  *     runInterruptible {         // Throws CancellationException if interrupted
20  *         doSomethingBlocking()  // Interrupted on coroutines cancellation
21  *     }
22  * }
23  * ```
24  *
25  * There is an optional [context] parameter to this function working just like [withContext].
26  * It enables single-call conversion of interruptible Java methods into suspending functions.
27  * With one call here we are moving the call to [Dispatchers.IO] and supporting interruption:
28  *
29  * ```
30  * suspend fun <T> BlockingQueue<T>.awaitTake(): T =
31  *         runInterruptible(Dispatchers.IO) { queue.take() }
32  * ```
33  *
34  * `runInterruptible` uses [withContext] as an underlying mechanism for switching context,
35  * meaning that the supplied [block] is invoked in an [undispatched][CoroutineStart.UNDISPATCHED]
36  * manner directly by the caller if [CoroutineDispatcher] from the current [coroutineContext][currentCoroutineContext]
37  * is the same as the one supplied in [context].
38  */
39 public suspend fun <T> runInterruptible(
40     context: CoroutineContext = EmptyCoroutineContext,
41     block: () -> T
42 ): T = withContext(context) {
43     runInterruptibleInExpectedContext(coroutineContext, block)
44 }
45 
runInterruptibleInExpectedContextnull46 private fun <T> runInterruptibleInExpectedContext(coroutineContext: CoroutineContext, block: () -> T): T {
47     try {
48         val threadState = ThreadState(coroutineContext.job)
49         threadState.setup()
50         try {
51             return block()
52         } finally {
53             threadState.clearInterrupt()
54         }
55     } catch (e: InterruptedException) {
56         throw CancellationException("Blocking call was interrupted due to parent cancellation").initCause(e)
57     }
58 }
59 
60 private const val WORKING = 0
61 private const val FINISHED = 1
62 private const val INTERRUPTING = 2
63 private const val INTERRUPTED = 3
64 
65 private class ThreadState(private val job: Job) : CompletionHandler {
66     /*
67        === States ===
68 
69        WORKING: running normally
70        FINISH: complete normally
71        INTERRUPTING: canceled, going to interrupt this thread
72        INTERRUPTED: this thread is interrupted
73 
74        === Possible Transitions ===
75 
76        +----------------+         register job       +-------------------------+
77        |    WORKING     |   cancellation listener    |         WORKING         |
78        | (thread, null) | -------------------------> | (thread, cancel handle) |
79        +----------------+                            +-------------------------+
80                |                                                |   |
81                | cancel                                  cancel |   | complete
82                |                                                |   |
83                V                                                |   |
84        +---------------+                                        |   |
85        | INTERRUPTING  | <--------------------------------------+   |
86        +---------------+                                            |
87                |                                                    |
88                | interrupt                                          |
89                |                                                    |
90                V                                                    V
91        +---------------+                              +-------------------------+
92        |  INTERRUPTED  |                              |         FINISHED        |
93        +---------------+                              +-------------------------+
94     */
95     private val _state = atomic(WORKING)
96     private val targetThread = Thread.currentThread()
97 
98     // Registered cancellation handler
99     private var cancelHandle: DisposableHandle? = null
100 
setupnull101     fun setup() {
102         cancelHandle = job.invokeOnCompletion(onCancelling = true, invokeImmediately = true, handler = this)
103         // Either we successfully stored it or it was immediately cancelled
104         _state.loop { state ->
105             when (state) {
106                 // Happy-path, move forward
107                 WORKING -> if (_state.compareAndSet(state, WORKING)) return
108                 // Immediately cancelled, just continue
109                 INTERRUPTING, INTERRUPTED -> return
110                 else -> invalidState(state)
111             }
112         }
113     }
114 
clearInterruptnull115     fun clearInterrupt() {
116         /*
117          * Do not allow to untriggered interrupt to leak
118          */
119         _state.loop { state ->
120             when (state) {
121                 WORKING -> if (_state.compareAndSet(state, FINISHED)) {
122                     cancelHandle?.dispose()
123                     return
124                 }
125                 INTERRUPTING -> {
126                    /*
127                     * Spin, cancellation mechanism is interrupting our thread right now
128                     * and we have to wait it and then clear interrupt status
129                     */
130                 }
131                 INTERRUPTED -> {
132                     // Clear it and bail out
133                     Thread.interrupted()
134                     return
135                 }
136                 else -> invalidState(state)
137             }
138         }
139     }
140 
141     // Cancellation handler
invokenull142     override fun invoke(cause: Throwable?) {
143         _state.loop { state ->
144             when (state) {
145                 // Working -> try to transite state and interrupt the thread
146                 WORKING -> {
147                     if (_state.compareAndSet(state, INTERRUPTING)) {
148                         targetThread.interrupt()
149                         _state.value = INTERRUPTED
150                         return
151                     }
152                 }
153                 // Finished -- runInterruptible is already complete, INTERRUPTING - ignore
154                 FINISHED, INTERRUPTING, INTERRUPTED -> return
155                 else -> invalidState(state)
156             }
157         }
158     }
159 
invalidStatenull160     private fun invalidState(state: Int): Nothing = error("Illegal state $state")
161 }
162