<lambda>null1 package kotlinx.coroutines
2
3 import kotlinx.atomicfu.*
4 import kotlin.coroutines.*
5
6 /**
7 * Calls the specified [block] with a given coroutine context in
8 * [an interruptible manner](https://docs.oracle.com/javase/tutorial/essential/concurrency/interrupt.html).
9 * The blocking code block will be interrupted and this function will throw [CancellationException]
10 * if the coroutine is cancelled.
11 *
12 * Example:
13 *
14 * ```
15 * withTimeout(500L) { // Cancels coroutine on timeout
16 * runInterruptible { // Throws CancellationException if interrupted
17 * doSomethingBlocking() // Interrupted on coroutines cancellation
18 * }
19 * }
20 * ```
21 *
22 * There is an optional [context] parameter to this function working just like [withContext].
23 * It enables single-call conversion of interruptible Java methods into suspending functions.
24 * With one call here we are moving the call to [Dispatchers.IO] and supporting interruption:
25 *
26 * ```
27 * suspend fun <T> BlockingQueue<T>.awaitTake(): T =
28 * runInterruptible(Dispatchers.IO) { queue.take() }
29 * ```
30 *
31 * `runInterruptible` uses [withContext] as an underlying mechanism for switching context,
32 * meaning that the supplied [block] is invoked in an [undispatched][CoroutineStart.UNDISPATCHED]
33 * manner directly by the caller if [CoroutineDispatcher] from the current [coroutineContext][currentCoroutineContext]
34 * is the same as the one supplied in [context].
35 */
36 public suspend fun <T> runInterruptible(
37 context: CoroutineContext = EmptyCoroutineContext,
38 block: () -> T
39 ): T = withContext(context) {
40 runInterruptibleInExpectedContext(coroutineContext, block)
41 }
42
runInterruptibleInExpectedContextnull43 private fun <T> runInterruptibleInExpectedContext(coroutineContext: CoroutineContext, block: () -> T): T {
44 try {
45 val threadState = ThreadState()
46 threadState.setup(coroutineContext.job)
47 try {
48 return block()
49 } finally {
50 threadState.clearInterrupt()
51 }
52 } catch (e: InterruptedException) {
53 throw CancellationException("Blocking call was interrupted due to parent cancellation").initCause(e)
54 }
55 }
56
57 private const val WORKING = 0
58 private const val FINISHED = 1
59 private const val INTERRUPTING = 2
60 private const val INTERRUPTED = 3
61
62 private class ThreadState : JobNode() {
63 /*
64 === States ===
65
66 WORKING: running normally
67 FINISH: complete normally
68 INTERRUPTING: canceled, going to interrupt this thread
69 INTERRUPTED: this thread is interrupted
70
71 === Possible Transitions ===
72
73 +----------------+ register job +-------------------------+
74 | WORKING | cancellation listener | WORKING |
75 | (thread, null) | -------------------------> | (thread, cancel handle) |
76 +----------------+ +-------------------------+
77 | | |
78 | cancel cancel | | complete
79 | | |
80 V | |
81 +---------------+ | |
82 | INTERRUPTING | <--------------------------------------+ |
83 +---------------+ |
84 | |
85 | interrupt |
86 | |
87 V V
88 +---------------+ +-------------------------+
89 | INTERRUPTED | | FINISHED |
90 +---------------+ +-------------------------+
91 */
92 private val _state = atomic(WORKING)
93 private val targetThread = Thread.currentThread()
94
95 // Registered cancellation handler
96 private var cancelHandle: DisposableHandle? = null
97
98 override val onCancelling get() = true
99
setupnull100 fun setup(job: Job) {
101 cancelHandle = job.invokeOnCompletion(handler = this)
102 // Either we successfully stored it or it was immediately cancelled
103 _state.loop { state ->
104 when (state) {
105 // Happy-path, move forward
106 WORKING -> if (_state.compareAndSet(state, WORKING)) return
107 // Immediately cancelled, just continue
108 INTERRUPTING, INTERRUPTED -> return
109 else -> invalidState(state)
110 }
111 }
112 }
113
clearInterruptnull114 fun clearInterrupt() {
115 /*
116 * Do not allow to untriggered interrupt to leak
117 */
118 _state.loop { state ->
119 when (state) {
120 WORKING -> if (_state.compareAndSet(state, FINISHED)) {
121 cancelHandle?.dispose()
122 return
123 }
124 INTERRUPTING -> {
125 /*
126 * Spin, cancellation mechanism is interrupting our thread right now
127 * and we have to wait it and then clear interrupt status
128 */
129 }
130 INTERRUPTED -> {
131 // Clear it and bail out
132 Thread.interrupted()
133 return
134 }
135 else -> invalidState(state)
136 }
137 }
138 }
139
140 // Cancellation handler
invokenull141 override fun invoke(cause: Throwable?) {
142 _state.loop { state ->
143 when (state) {
144 // Working -> try to transite state and interrupt the thread
145 WORKING -> {
146 if (_state.compareAndSet(state, INTERRUPTING)) {
147 targetThread.interrupt()
148 _state.value = INTERRUPTED
149 return
150 }
151 }
152 // Finished -- runInterruptible is already complete, INTERRUPTING - ignore
153 FINISHED, INTERRUPTING, INTERRUPTED -> return
154 else -> invalidState(state)
155 }
156 }
157 }
158
invalidStatenull159 private fun invalidState(state: Int): Nothing = error("Illegal state $state")
160 }
161