1 /*
<lambda>null2 * Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines
6
7 import kotlinx.atomicfu.*
8 import kotlin.coroutines.*
9
10 /**
11 * Calls the specified [block] with a given coroutine context in
12 * [an interruptible manner](https://docs.oracle.com/javase/tutorial/essential/concurrency/interrupt.html).
13 * The blocking code block will be interrupted and this function will throw [CancellationException]
14 * if the coroutine is cancelled.
15 *
16 * Example:
17 *
18 * ```
19 * withTimeout(500L) { // Cancels coroutine on timeout
20 * runInterruptible { // Throws CancellationException if interrupted
21 * doSomethingBlocking() // Interrupted on coroutines cancellation
22 * }
23 * }
24 * ```
25 *
26 * There is an optional [context] parameter to this function working just like [withContext].
27 * It enables single-call conversion of interruptible Java methods into suspending functions.
28 * With one call here we are moving the call to [Dispatchers.IO] and supporting interruption:
29 *
30 * ```
31 * suspend fun <T> BlockingQueue<T>.awaitTake(): T =
32 * runInterruptible(Dispatchers.IO) { queue.take() }
33 * ```
34 *
35 * `runInterruptible` uses [withContext] as an underlying mechanism for switching context,
36 * meaning that the supplied [block] is invoked in an [undispatched][CoroutineStart.UNDISPATCHED]
37 * manner directly by the caller if [CoroutineDispatcher] from the current [coroutineContext][currentCoroutineContext]
38 * is the same as the one supplied in [context].
39 */
40 public suspend fun <T> runInterruptible(
41 context: CoroutineContext = EmptyCoroutineContext,
42 block: () -> T
43 ): T = withContext(context) {
44 runInterruptibleInExpectedContext(coroutineContext, block)
45 }
46
runInterruptibleInExpectedContextnull47 private fun <T> runInterruptibleInExpectedContext(coroutineContext: CoroutineContext, block: () -> T): T {
48 try {
49 val threadState = ThreadState(coroutineContext.job)
50 threadState.setup()
51 try {
52 return block()
53 } finally {
54 threadState.clearInterrupt()
55 }
56 } catch (e: InterruptedException) {
57 throw CancellationException("Blocking call was interrupted due to parent cancellation").initCause(e)
58 }
59 }
60
61 private const val WORKING = 0
62 private const val FINISHED = 1
63 private const val INTERRUPTING = 2
64 private const val INTERRUPTED = 3
65
66 private class ThreadState(private val job: Job) : CompletionHandler {
67 /*
68 === States ===
69
70 WORKING: running normally
71 FINISH: complete normally
72 INTERRUPTING: canceled, going to interrupt this thread
73 INTERRUPTED: this thread is interrupted
74
75 === Possible Transitions ===
76
77 +----------------+ register job +-------------------------+
78 | WORKING | cancellation listener | WORKING |
79 | (thread, null) | -------------------------> | (thread, cancel handle) |
80 +----------------+ +-------------------------+
81 | | |
82 | cancel cancel | | complete
83 | | |
84 V | |
85 +---------------+ | |
86 | INTERRUPTING | <--------------------------------------+ |
87 +---------------+ |
88 | |
89 | interrupt |
90 | |
91 V V
92 +---------------+ +-------------------------+
93 | INTERRUPTED | | FINISHED |
94 +---------------+ +-------------------------+
95 */
96 private val _state = atomic(WORKING)
97 private val targetThread = Thread.currentThread()
98
99 // Registered cancellation handler
100 private var cancelHandle: DisposableHandle? = null
101
setupnull102 fun setup() {
103 cancelHandle = job.invokeOnCompletion(onCancelling = true, invokeImmediately = true, handler = this)
104 // Either we successfully stored it or it was immediately cancelled
105 _state.loop { state ->
106 when (state) {
107 // Happy-path, move forward
108 WORKING -> if (_state.compareAndSet(state, WORKING)) return
109 // Immediately cancelled, just continue
110 INTERRUPTING, INTERRUPTED -> return
111 else -> invalidState(state)
112 }
113 }
114 }
115
clearInterruptnull116 fun clearInterrupt() {
117 /*
118 * Do not allow to untriggered interrupt to leak
119 */
120 _state.loop { state ->
121 when (state) {
122 WORKING -> if (_state.compareAndSet(state, FINISHED)) {
123 cancelHandle?.dispose()
124 return
125 }
126 INTERRUPTING -> {
127 /*
128 * Spin, cancellation mechanism is interrupting our thread right now
129 * and we have to wait it and then clear interrupt status
130 */
131 }
132 INTERRUPTED -> {
133 // Clear it and bail out
134 Thread.interrupted()
135 return
136 }
137 else -> invalidState(state)
138 }
139 }
140 }
141
142 // Cancellation handler
invokenull143 override fun invoke(cause: Throwable?) {
144 _state.loop { state ->
145 when (state) {
146 // Working -> try to transite state and interrupt the thread
147 WORKING -> {
148 if (_state.compareAndSet(state, INTERRUPTING)) {
149 targetThread.interrupt()
150 _state.value = INTERRUPTED
151 return
152 }
153 }
154 // Finished -- runInterruptible is already complete, INTERRUPTING - ignore
155 FINISHED, INTERRUPTING, INTERRUPTED -> return
156 else -> invalidState(state)
157 }
158 }
159 }
160
invalidStatenull161 private fun invalidState(state: Int): Nothing = error("Illegal state $state")
162 }
163