1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines
6
7 import kotlinx.atomicfu.*
8 import kotlin.coroutines.*
9
10 /**
11 * Calls the specified [block] with a given coroutine context in an interruptible manner.
12 * The blocking code block will be interrupted and this function will throw [CancellationException]
13 * if the coroutine is cancelled.
14 *
15 * Example:
16 *
17 * ```
18 * withTimeout(500L) { // Cancels coroutine on timeout
19 * runInterruptible { // Throws CancellationException if interrupted
20 * doSomethingBlocking() // Interrupted on coroutines cancellation
21 * }
22 * }
23 * ```
24 *
25 * There is an optional [context] parameter to this function working just like [withContext].
26 * It enables single-call conversion of interruptible Java methods into suspending functions.
27 * With one call here we are moving the call to [Dispatchers.IO] and supporting interruption:
28 *
29 * ```
30 * suspend fun <T> BlockingQueue<T>.awaitTake(): T =
31 * runInterruptible(Dispatchers.IO) { queue.take() }
32 * ```
33 *
34 * `runInterruptible` uses [withContext] as an underlying mechanism for switching context,
35 * meaning that the supplied [block] is invoked in an [undispatched][CoroutineStart.UNDISPATCHED]
36 * manner directly by the caller if [CoroutineDispatcher] from the current [coroutineContext][currentCoroutineContext]
37 * is the same as the one supplied in [context].
38 */
39 public suspend fun <T> runInterruptible(
40 context: CoroutineContext = EmptyCoroutineContext,
41 block: () -> T
42 ): T = withContext(context) {
43 runInterruptibleInExpectedContext(coroutineContext, block)
44 }
45
runInterruptibleInExpectedContextnull46 private fun <T> runInterruptibleInExpectedContext(coroutineContext: CoroutineContext, block: () -> T): T {
47 try {
48 val threadState = ThreadState(coroutineContext.job)
49 threadState.setup()
50 try {
51 return block()
52 } finally {
53 threadState.clearInterrupt()
54 }
55 } catch (e: InterruptedException) {
56 throw CancellationException("Blocking call was interrupted due to parent cancellation").initCause(e)
57 }
58 }
59
60 private const val WORKING = 0
61 private const val FINISHED = 1
62 private const val INTERRUPTING = 2
63 private const val INTERRUPTED = 3
64
65 private class ThreadState(private val job: Job) : CompletionHandler {
66 /*
67 === States ===
68
69 WORKING: running normally
70 FINISH: complete normally
71 INTERRUPTING: canceled, going to interrupt this thread
72 INTERRUPTED: this thread is interrupted
73
74 === Possible Transitions ===
75
76 +----------------+ register job +-------------------------+
77 | WORKING | cancellation listener | WORKING |
78 | (thread, null) | -------------------------> | (thread, cancel handle) |
79 +----------------+ +-------------------------+
80 | | |
81 | cancel cancel | | complete
82 | | |
83 V | |
84 +---------------+ | |
85 | INTERRUPTING | <--------------------------------------+ |
86 +---------------+ |
87 | |
88 | interrupt |
89 | |
90 V V
91 +---------------+ +-------------------------+
92 | INTERRUPTED | | FINISHED |
93 +---------------+ +-------------------------+
94 */
95 private val _state = atomic(WORKING)
96 private val targetThread = Thread.currentThread()
97
98 // Registered cancellation handler
99 private var cancelHandle: DisposableHandle? = null
100
setupnull101 fun setup() {
102 cancelHandle = job.invokeOnCompletion(onCancelling = true, invokeImmediately = true, handler = this)
103 // Either we successfully stored it or it was immediately cancelled
104 _state.loop { state ->
105 when (state) {
106 // Happy-path, move forward
107 WORKING -> if (_state.compareAndSet(state, WORKING)) return
108 // Immediately cancelled, just continue
109 INTERRUPTING, INTERRUPTED -> return
110 else -> invalidState(state)
111 }
112 }
113 }
114
clearInterruptnull115 fun clearInterrupt() {
116 /*
117 * Do not allow to untriggered interrupt to leak
118 */
119 _state.loop { state ->
120 when (state) {
121 WORKING -> if (_state.compareAndSet(state, FINISHED)) {
122 cancelHandle?.dispose()
123 return
124 }
125 INTERRUPTING -> {
126 /*
127 * Spin, cancellation mechanism is interrupting our thread right now
128 * and we have to wait it and then clear interrupt status
129 */
130 }
131 INTERRUPTED -> {
132 // Clear it and bail out
133 Thread.interrupted()
134 return
135 }
136 else -> invalidState(state)
137 }
138 }
139 }
140
141 // Cancellation handler
invokenull142 override fun invoke(cause: Throwable?) {
143 _state.loop { state ->
144 when (state) {
145 // Working -> try to transite state and interrupt the thread
146 WORKING -> {
147 if (_state.compareAndSet(state, INTERRUPTING)) {
148 targetThread.interrupt()
149 _state.value = INTERRUPTED
150 return
151 }
152 }
153 // Finished -- runInterruptible is already complete, INTERRUPTING - ignore
154 FINISHED, INTERRUPTING, INTERRUPTED -> return
155 else -> invalidState(state)
156 }
157 }
158 }
159
invalidStatenull160 private fun invalidState(state: Int): Nothing = error("Illegal state $state")
161 }
162