• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines
2 
3 import kotlinx.atomicfu.*
4 import kotlin.coroutines.*
5 
6 /**
7  * Calls the specified [block] with a given coroutine context in
8  * [an interruptible manner](https://docs.oracle.com/javase/tutorial/essential/concurrency/interrupt.html).
9  * The blocking code block will be interrupted and this function will throw [CancellationException]
10  * if the coroutine is cancelled.
11  *
12  * Example:
13  *
14  * ```
15  * withTimeout(500L) {            // Cancels coroutine on timeout
16  *     runInterruptible {         // Throws CancellationException if interrupted
17  *         doSomethingBlocking()  // Interrupted on coroutines cancellation
18  *     }
19  * }
20  * ```
21  *
22  * There is an optional [context] parameter to this function working just like [withContext].
23  * It enables single-call conversion of interruptible Java methods into suspending functions.
24  * With one call here we are moving the call to [Dispatchers.IO] and supporting interruption:
25  *
26  * ```
27  * suspend fun <T> BlockingQueue<T>.awaitTake(): T =
28  *         runInterruptible(Dispatchers.IO) { queue.take() }
29  * ```
30  *
31  * `runInterruptible` uses [withContext] as an underlying mechanism for switching context,
32  * meaning that the supplied [block] is invoked in an [undispatched][CoroutineStart.UNDISPATCHED]
33  * manner directly by the caller if [CoroutineDispatcher] from the current [coroutineContext][currentCoroutineContext]
34  * is the same as the one supplied in [context].
35  */
36 public suspend fun <T> runInterruptible(
37     context: CoroutineContext = EmptyCoroutineContext,
38     block: () -> T
39 ): T = withContext(context) {
40     runInterruptibleInExpectedContext(coroutineContext, block)
41 }
42 
runInterruptibleInExpectedContextnull43 private fun <T> runInterruptibleInExpectedContext(coroutineContext: CoroutineContext, block: () -> T): T {
44     try {
45         val threadState = ThreadState()
46         threadState.setup(coroutineContext.job)
47         try {
48             return block()
49         } finally {
50             threadState.clearInterrupt()
51         }
52     } catch (e: InterruptedException) {
53         throw CancellationException("Blocking call was interrupted due to parent cancellation").initCause(e)
54     }
55 }
56 
57 private const val WORKING = 0
58 private const val FINISHED = 1
59 private const val INTERRUPTING = 2
60 private const val INTERRUPTED = 3
61 
62 private class ThreadState : JobNode() {
63     /*
64        === States ===
65 
66        WORKING: running normally
67        FINISH: complete normally
68        INTERRUPTING: canceled, going to interrupt this thread
69        INTERRUPTED: this thread is interrupted
70 
71        === Possible Transitions ===
72 
73        +----------------+         register job       +-------------------------+
74        |    WORKING     |   cancellation listener    |         WORKING         |
75        | (thread, null) | -------------------------> | (thread, cancel handle) |
76        +----------------+                            +-------------------------+
77                |                                                |   |
78                | cancel                                  cancel |   | complete
79                |                                                |   |
80                V                                                |   |
81        +---------------+                                        |   |
82        | INTERRUPTING  | <--------------------------------------+   |
83        +---------------+                                            |
84                |                                                    |
85                | interrupt                                          |
86                |                                                    |
87                V                                                    V
88        +---------------+                              +-------------------------+
89        |  INTERRUPTED  |                              |         FINISHED        |
90        +---------------+                              +-------------------------+
91     */
92     private val _state = atomic(WORKING)
93     private val targetThread = Thread.currentThread()
94 
95     // Registered cancellation handler
96     private var cancelHandle: DisposableHandle? = null
97 
98     override val onCancelling get() = true
99 
setupnull100     fun setup(job: Job) {
101         cancelHandle = job.invokeOnCompletion(handler = this)
102         // Either we successfully stored it or it was immediately cancelled
103         _state.loop { state ->
104             when (state) {
105                 // Happy-path, move forward
106                 WORKING -> if (_state.compareAndSet(state, WORKING)) return
107                 // Immediately cancelled, just continue
108                 INTERRUPTING, INTERRUPTED -> return
109                 else -> invalidState(state)
110             }
111         }
112     }
113 
clearInterruptnull114     fun clearInterrupt() {
115         /*
116          * Do not allow to untriggered interrupt to leak
117          */
118         _state.loop { state ->
119             when (state) {
120                 WORKING -> if (_state.compareAndSet(state, FINISHED)) {
121                     cancelHandle?.dispose()
122                     return
123                 }
124                 INTERRUPTING -> {
125                    /*
126                     * Spin, cancellation mechanism is interrupting our thread right now
127                     * and we have to wait it and then clear interrupt status
128                     */
129                 }
130                 INTERRUPTED -> {
131                     // Clear it and bail out
132                     Thread.interrupted()
133                     return
134                 }
135                 else -> invalidState(state)
136             }
137         }
138     }
139 
140     // Cancellation handler
invokenull141     override fun invoke(cause: Throwable?) {
142         _state.loop { state ->
143             when (state) {
144                 // Working -> try to transite state and interrupt the thread
145                 WORKING -> {
146                     if (_state.compareAndSet(state, INTERRUPTING)) {
147                         targetThread.interrupt()
148                         _state.value = INTERRUPTED
149                         return
150                     }
151                 }
152                 // Finished -- runInterruptible is already complete, INTERRUPTING - ignore
153                 FINISHED, INTERRUPTING, INTERRUPTED -> return
154                 else -> invalidState(state)
155             }
156         }
157     }
158 
invalidStatenull159     private fun invalidState(state: Int): Nothing = error("Illegal state $state")
160 }
161