1 package kotlinx.coroutines
2
3 import kotlinx.coroutines.Runnable
4 import kotlinx.coroutines.scheduling.*
5 import kotlinx.coroutines.scheduling.CoroutineScheduler
6
7 internal actual abstract class EventLoopImplPlatform: EventLoop() {
8
9 protected abstract val thread: Thread
10
unparknull11 protected actual fun unpark() {
12 val thread = thread // atomic read
13 if (Thread.currentThread() !== thread)
14 unpark(thread)
15 }
16
reschedulenull17 protected actual open fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) {
18 DefaultExecutor.schedule(now, delayedTask)
19 }
20 }
21
22 internal class BlockingEventLoop(
23 override val thread: Thread
24 ) : EventLoopImplBase()
25
createEventLoopnull26 internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread())
27
28 /**
29 * Processes next event in the current thread's event loop.
30 *
31 * The result of this function is to be interpreted like this:
32 * - `<= 0` -- there are potentially more events for immediate processing;
33 * - `> 0` -- a number of nanoseconds to wait for the next scheduled event;
34 * - [Long.MAX_VALUE] -- no more events or no thread-local event loop.
35 *
36 * Sample usage of this function:
37 *
38 * ```
39 * while (waitingCondition) {
40 * val time = processNextEventInCurrentThread()
41 * LockSupport.parkNanos(time)
42 * }
43 * ```
44 *
45 * @suppress **This an internal API and should not be used from general code.**
46 */
47 @InternalCoroutinesApi
48 public fun processNextEventInCurrentThread(): Long =
49 // This API is used in Ktor for serverless integration where a single thread awaits a blocking call
50 // (and, to avoid actual blocking, does something via this call), see #850
51 ThreadLocalEventLoop.currentOrNull()?.processNextEvent() ?: Long.MAX_VALUE
52
53 internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block()
54
55 /**
56 * Retrieves and executes a single task from the current system dispatcher ([Dispatchers.Default] or [Dispatchers.IO]).
57 * Returns `0` if any task was executed, `>= 0` for number of nanoseconds to wait until invoking this method again
58 * (implying that there will be a task to steal in N nanoseconds), `-1` if there is no tasks in the corresponding dispatcher at all.
59 *
60 * ### Invariants
61 *
62 * - When invoked from [Dispatchers.Default] **thread** (even if the actual context is different dispatcher,
63 * [CoroutineDispatcher.limitedParallelism] or any in-place wrapper), it runs an arbitrary task that ended
64 * up being scheduled to [Dispatchers.Default] or its counterpart. Tasks scheduled to [Dispatchers.IO]
65 * **are not** executed[1].
66 * - When invoked from [Dispatchers.IO] thread, the same rules apply, but for blocking tasks only.
67 *
68 * [1] -- this is purely technical limitation: the scheduler does not have "notify me when CPU token is available" API,
69 * and we cannot leave this method without leaving thread in its original state.
70 *
71 * ### Rationale
72 *
73 * This is an internal API that is intended to replace IDEA's core FJP decomposition.
74 * The following API is provided by IDEA core:
75 * ```
76 * runDecomposedTaskAndJoinIt { // <- non-suspending call
77 * // spawn as many tasks as needed
78 * // these tasks can also invoke 'runDecomposedTaskAndJoinIt'
79 * }
80 * ```
81 * The key observation here is that 'runDecomposedTaskAndJoinIt' can be invoked from `Dispatchers.Default` itself,
82 * thus blocking at least one thread. To avoid deadlocks and starvation during large hierarchical decompositions,
83 * 'runDecomposedTaskAndJoinIt' should not just block but also **help** execute the task or other tasks
84 * until an arbitrary condition is satisfied.
85 *
86 * See #3439 for additional details.
87 *
88 * ### Limitations and caveats
89 *
90 * - Executes tasks in-place, thus potentially leaking irrelevant thread-locals from the current thread
91 * - Is not 100% effective, because the caller should somehow "wait" (or do other work) for [Long] returned nanoseconds
92 * even when work arrives immediately after returning from this method.
93 * - When there is no more work, it's up to the caller to decide what to do. It's important to remember that
94 * work to current dispatcher may arrive **later** from external sources [1]
95 *
96 * [1] -- this is also a technicality that can be solved in kotlinx.coroutines itself, but unfortunately requires
97 * a tremendous effort.
98 *
99 * @throws IllegalStateException if the current thread is not system dispatcher thread
100 */
101 @InternalCoroutinesApi
102 @DelicateCoroutinesApi
103 @PublishedApi
104 internal fun runSingleTaskFromCurrentSystemDispatcher(): Long {
105 val thread = Thread.currentThread()
106 if (thread !is CoroutineScheduler.Worker) throw IllegalStateException("Expected CoroutineScheduler.Worker, but got $thread")
107 return thread.runSingleTask()
108 }
109
110 /**
111 * Checks whether the given thread belongs to Dispatchers.IO.
112 * Note that feature "is part of the Dispatchers.IO" is *dynamic*, meaning that the thread
113 * may change this status when switching between tasks.
114 *
115 * This function is inteded to be used on the result of `Thread.currentThread()` for diagnostic
116 * purposes, and is declared as an extension only to avoid top-level scope pollution.
117 */
118 @InternalCoroutinesApi
119 @DelicateCoroutinesApi
120 @PublishedApi
isIoDispatcherThreadnull121 internal fun Thread.isIoDispatcherThread(): Boolean {
122 if (this !is CoroutineScheduler.Worker) return false
123 return isIo()
124 }
125
126