• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.atomicfu.*
8 import kotlin.coroutines.*
9 
10 /**
11  * Calls the specified [block] with a given coroutine context in
12  * [an interruptible manner](https://docs.oracle.com/javase/tutorial/essential/concurrency/interrupt.html).
13  * The blocking code block will be interrupted and this function will throw [CancellationException]
14  * if the coroutine is cancelled.
15  *
16  * Example:
17  *
18  * ```
19  * withTimeout(500L) {            // Cancels coroutine on timeout
20  *     runInterruptible {         // Throws CancellationException if interrupted
21  *         doSomethingBlocking()  // Interrupted on coroutines cancellation
22  *     }
23  * }
24  * ```
25  *
26  * There is an optional [context] parameter to this function working just like [withContext].
27  * It enables single-call conversion of interruptible Java methods into suspending functions.
28  * With one call here we are moving the call to [Dispatchers.IO] and supporting interruption:
29  *
30  * ```
31  * suspend fun <T> BlockingQueue<T>.awaitTake(): T =
32  *         runInterruptible(Dispatchers.IO) { queue.take() }
33  * ```
34  *
35  * `runInterruptible` uses [withContext] as an underlying mechanism for switching context,
36  * meaning that the supplied [block] is invoked in an [undispatched][CoroutineStart.UNDISPATCHED]
37  * manner directly by the caller if [CoroutineDispatcher] from the current [coroutineContext][currentCoroutineContext]
38  * is the same as the one supplied in [context].
39  */
40 public suspend fun <T> runInterruptible(
41     context: CoroutineContext = EmptyCoroutineContext,
42     block: () -> T
43 ): T = withContext(context) {
44     runInterruptibleInExpectedContext(coroutineContext, block)
45 }
46 
runInterruptibleInExpectedContextnull47 private fun <T> runInterruptibleInExpectedContext(coroutineContext: CoroutineContext, block: () -> T): T {
48     try {
49         val threadState = ThreadState(coroutineContext.job)
50         threadState.setup()
51         try {
52             return block()
53         } finally {
54             threadState.clearInterrupt()
55         }
56     } catch (e: InterruptedException) {
57         throw CancellationException("Blocking call was interrupted due to parent cancellation").initCause(e)
58     }
59 }
60 
61 private const val WORKING = 0
62 private const val FINISHED = 1
63 private const val INTERRUPTING = 2
64 private const val INTERRUPTED = 3
65 
66 private class ThreadState(private val job: Job) : CompletionHandler {
67     /*
68        === States ===
69 
70        WORKING: running normally
71        FINISH: complete normally
72        INTERRUPTING: canceled, going to interrupt this thread
73        INTERRUPTED: this thread is interrupted
74 
75        === Possible Transitions ===
76 
77        +----------------+         register job       +-------------------------+
78        |    WORKING     |   cancellation listener    |         WORKING         |
79        | (thread, null) | -------------------------> | (thread, cancel handle) |
80        +----------------+                            +-------------------------+
81                |                                                |   |
82                | cancel                                  cancel |   | complete
83                |                                                |   |
84                V                                                |   |
85        +---------------+                                        |   |
86        | INTERRUPTING  | <--------------------------------------+   |
87        +---------------+                                            |
88                |                                                    |
89                | interrupt                                          |
90                |                                                    |
91                V                                                    V
92        +---------------+                              +-------------------------+
93        |  INTERRUPTED  |                              |         FINISHED        |
94        +---------------+                              +-------------------------+
95     */
96     private val _state = atomic(WORKING)
97     private val targetThread = Thread.currentThread()
98 
99     // Registered cancellation handler
100     private var cancelHandle: DisposableHandle? = null
101 
setupnull102     fun setup() {
103         cancelHandle = job.invokeOnCompletion(onCancelling = true, invokeImmediately = true, handler = this)
104         // Either we successfully stored it or it was immediately cancelled
105         _state.loop { state ->
106             when (state) {
107                 // Happy-path, move forward
108                 WORKING -> if (_state.compareAndSet(state, WORKING)) return
109                 // Immediately cancelled, just continue
110                 INTERRUPTING, INTERRUPTED -> return
111                 else -> invalidState(state)
112             }
113         }
114     }
115 
clearInterruptnull116     fun clearInterrupt() {
117         /*
118          * Do not allow to untriggered interrupt to leak
119          */
120         _state.loop { state ->
121             when (state) {
122                 WORKING -> if (_state.compareAndSet(state, FINISHED)) {
123                     cancelHandle?.dispose()
124                     return
125                 }
126                 INTERRUPTING -> {
127                    /*
128                     * Spin, cancellation mechanism is interrupting our thread right now
129                     * and we have to wait it and then clear interrupt status
130                     */
131                 }
132                 INTERRUPTED -> {
133                     // Clear it and bail out
134                     Thread.interrupted()
135                     return
136                 }
137                 else -> invalidState(state)
138             }
139         }
140     }
141 
142     // Cancellation handler
invokenull143     override fun invoke(cause: Throwable?) {
144         _state.loop { state ->
145             when (state) {
146                 // Working -> try to transite state and interrupt the thread
147                 WORKING -> {
148                     if (_state.compareAndSet(state, INTERRUPTING)) {
149                         targetThread.interrupt()
150                         _state.value = INTERRUPTED
151                         return
152                     }
153                 }
154                 // Finished -- runInterruptible is already complete, INTERRUPTING - ignore
155                 FINISHED, INTERRUPTING, INTERRUPTED -> return
156                 else -> invalidState(state)
157             }
158         }
159     }
160 
invalidStatenull161     private fun invalidState(state: Int): Nothing = error("Illegal state $state")
162 }
163