1 /*
2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines
6
7 import kotlinx.coroutines.Runnable
8 import kotlinx.coroutines.scheduling.*
9 import kotlinx.coroutines.scheduling.CoroutineScheduler
10
11 internal actual abstract class EventLoopImplPlatform: EventLoop() {
12 protected abstract val thread: Thread
13
unparknull14 protected actual fun unpark() {
15 val thread = thread // atomic read
16 if (Thread.currentThread() !== thread)
17 unpark(thread)
18 }
19
reschedulenull20 protected actual open fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) {
21 DefaultExecutor.schedule(now, delayedTask)
22 }
23 }
24
25 internal class BlockingEventLoop(
26 override val thread: Thread
27 ) : EventLoopImplBase()
28
createEventLoopnull29 internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread())
30
31 /**
32 * Processes next event in the current thread's event loop.
33 *
34 * The result of this function is to be interpreted like this:
35 * * `<= 0` -- there are potentially more events for immediate processing;
36 * * `> 0` -- a number of nanoseconds to wait for the next scheduled event;
37 * * [Long.MAX_VALUE] -- no more events or no thread-local event loop.
38 *
39 * Sample usage of this function:
40 *
41 * ```
42 * while (waitingCondition) {
43 * val time = processNextEventInCurrentThread()
44 * LockSupport.parkNanos(time)
45 * }
46 * ```
47 *
48 * @suppress **This an internal API and should not be used from general code.**
49 */
50 @InternalCoroutinesApi
51 public fun processNextEventInCurrentThread(): Long =
52 // This API is used in Ktor for serverless integration where a single thread awaits a blocking call
53 // (and, to avoid actual blocking, does something via this call), see #850
54 ThreadLocalEventLoop.currentOrNull()?.processNextEvent() ?: Long.MAX_VALUE
55
56 internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block()
57
58 /**
59 * Retrieves and executes a single task from the current system dispatcher ([Dispatchers.Default] or [Dispatchers.IO]).
60 * Returns `0` if any task was executed, `>= 0` for number of nanoseconds to wait until invoking this method again
61 * (implying that there will be a task to steal in N nanoseconds), `-1` if there is no tasks in the corresponding dispatcher at all.
62 *
63 * ### Invariants
64 *
65 * - When invoked from [Dispatchers.Default] **thread** (even if the actual context is different dispatcher,
66 * [CoroutineDispatcher.limitedParallelism] or any in-place wrapper), it runs an arbitrary task that ended
67 * up being scheduled to [Dispatchers.Default] or its counterpart. Tasks scheduled to [Dispatchers.IO]
68 * **are not** executed[1].
69 * - When invoked from [Dispatchers.IO] thread, the same rules apply, but for blocking tasks only.
70 *
71 * [1] -- this is purely technical limitation: the scheduler does not have "notify me when CPU token is available" API,
72 * and we cannot leave this method without leaving thread in its original state.
73 *
74 * ### Rationale
75 *
76 * This is an internal API that is intended to replace IDEA's core FJP decomposition.
77 * The following API is provided by IDEA core:
78 * ```
79 * runDecomposedTaskAndJoinIt { // <- non-suspending call
80 * // spawn as many tasks as needed
81 * // these tasks can also invoke 'runDecomposedTaskAndJoinIt'
82 * }
83 * ```
84 * The key observation here is that 'runDecomposedTaskAndJoinIt' can be invoked from `Dispatchers.Default` itself,
85 * thus blocking at least one thread. To avoid deadlocks and starvation during large hierarchical decompositions,
86 * 'runDecomposedTaskAndJoinIt' should not just block but also **help** execute the task or other tasks
87 * until an arbitrary condition is satisfied.
88 *
89 * See #3439 for additional details.
90 *
91 * ### Limitations and caveats
92 *
93 * - Executes tasks in-place, thus potentially leaking irrelevant thread-locals from the current thread
94 * - Is not 100% effective, because the caller should somehow "wait" (or do other work) for [Long] returned nanoseconds
95 * even when work arrives immediately after returning from this method.
96 * - When there is no more work, it's up to the caller to decide what to do. It's important to remember that
97 * work to current dispatcher may arrive **later** from external sources [1]
98 *
99 * [1] -- this is also a technicality that can be solved in kotlinx.coroutines itself, but unfortunately requires
100 * a tremendous effort.
101 *
102 * @throws IllegalStateException if the current thread is not system dispatcher thread
103 */
104 @InternalCoroutinesApi
105 @DelicateCoroutinesApi
106 @PublishedApi
107 internal fun runSingleTaskFromCurrentSystemDispatcher(): Long {
108 val thread = Thread.currentThread()
109 if (thread !is CoroutineScheduler.Worker) throw IllegalStateException("Expected CoroutineScheduler.Worker, but got $thread")
110 return thread.runSingleTask()
111 }
112
113 /**
114 * Checks whether the given thread belongs to Dispatchers.IO.
115 * Note that feature "is part of the Dispatchers.IO" is *dynamic*, meaning that the thread
116 * may change this status when switching between tasks.
117 *
118 * This function is inteded to be used on the result of `Thread.currentThread()` for diagnostic
119 * purposes, and is declared as an extension only to avoid top-level scope pollution.
120 */
121 @InternalCoroutinesApi
122 @DelicateCoroutinesApi
123 @PublishedApi
isIoDispatcherThreadnull124 internal fun Thread.isIoDispatcherThread(): Boolean {
125 if (this !is CoroutineScheduler.Worker) return false
126 return isIo()
127 }
128
129