• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package kotlinx.coroutines
2 
3 import kotlinx.coroutines.Runnable
4 import kotlinx.coroutines.scheduling.*
5 import kotlinx.coroutines.scheduling.CoroutineScheduler
6 
7 internal actual abstract class EventLoopImplPlatform: EventLoop() {
8 
9     protected abstract val thread: Thread
10 
unparknull11     protected actual fun unpark() {
12         val thread = thread // atomic read
13         if (Thread.currentThread() !== thread)
14             unpark(thread)
15     }
16 
reschedulenull17     protected actual open fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) {
18         DefaultExecutor.schedule(now, delayedTask)
19     }
20 }
21 
22 internal class BlockingEventLoop(
23     override val thread: Thread
24 ) : EventLoopImplBase()
25 
createEventLoopnull26 internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread())
27 
28 /**
29  * Processes next event in the current thread's event loop.
30  *
31  * The result of this function is to be interpreted like this:
32  * - `<= 0` -- there are potentially more events for immediate processing;
33  * - `> 0` -- a number of nanoseconds to wait for the next scheduled event;
34  * - [Long.MAX_VALUE] -- no more events or no thread-local event loop.
35  *
36  * Sample usage of this function:
37  *
38  * ```
39  * while (waitingCondition) {
40  *     val time = processNextEventInCurrentThread()
41  *     LockSupport.parkNanos(time)
42  * }
43  * ```
44  *
45  * @suppress **This an internal API and should not be used from general code.**
46  */
47 @InternalCoroutinesApi
48 public fun processNextEventInCurrentThread(): Long =
49     // This API is used in Ktor for serverless integration where a single thread awaits a blocking call
50     // (and, to avoid actual blocking, does something via this call), see #850
51     ThreadLocalEventLoop.currentOrNull()?.processNextEvent() ?: Long.MAX_VALUE
52 
53 internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block()
54 
55 /**
56  * Retrieves and executes a single task from the current system dispatcher ([Dispatchers.Default] or [Dispatchers.IO]).
57  * Returns `0` if any task was executed, `>= 0` for number of nanoseconds to wait until invoking this method again
58  * (implying that there will be a task to steal in N nanoseconds), `-1` if there is no tasks in the corresponding dispatcher at all.
59  *
60  * ### Invariants
61  *
62  * - When invoked from [Dispatchers.Default] **thread** (even if the actual context is different dispatcher,
63  *   [CoroutineDispatcher.limitedParallelism] or any in-place wrapper), it runs an arbitrary task that ended
64  *   up being scheduled to [Dispatchers.Default] or its counterpart. Tasks scheduled to [Dispatchers.IO]
65  *   **are not** executed[1].
66  * - When invoked from [Dispatchers.IO] thread, the same rules apply, but for blocking tasks only.
67  *
68  * [1] -- this is purely technical limitation: the scheduler does not have "notify me when CPU token is available" API,
69  * and we cannot leave this method without leaving thread in its original state.
70  *
71  * ### Rationale
72  *
73  * This is an internal API that is intended to replace IDEA's core FJP decomposition.
74  * The following API is provided by IDEA core:
75  * ```
76  * runDecomposedTaskAndJoinIt { // <- non-suspending call
77  *     // spawn as many tasks as needed
78  *     // these tasks can also invoke 'runDecomposedTaskAndJoinIt'
79  * }
80  * ```
81  * The key observation here is that 'runDecomposedTaskAndJoinIt' can be invoked from `Dispatchers.Default` itself,
82  * thus blocking at least one thread. To avoid deadlocks and starvation during large hierarchical decompositions,
83  * 'runDecomposedTaskAndJoinIt' should not just block but also **help** execute the task or other tasks
84  * until an arbitrary condition is satisfied.
85  *
86  * See #3439 for additional details.
87  *
88  * ### Limitations and caveats
89  *
90  * - Executes tasks in-place, thus potentially leaking irrelevant thread-locals from the current thread
91  * - Is not 100% effective, because the caller should somehow "wait" (or do other work) for [Long] returned nanoseconds
92  *   even when work arrives immediately after returning from this method.
93  * - When there is no more work, it's up to the caller to decide what to do. It's important to remember that
94  *   work to current dispatcher may arrive **later** from external sources [1]
95  *
96  * [1] -- this is also a technicality that can be solved in kotlinx.coroutines itself, but unfortunately requires
97  *        a tremendous effort.
98  *
99  * @throws IllegalStateException if the current thread is not system dispatcher thread
100  */
101 @InternalCoroutinesApi
102 @DelicateCoroutinesApi
103 @PublishedApi
104 internal fun runSingleTaskFromCurrentSystemDispatcher(): Long {
105     val thread = Thread.currentThread()
106     if (thread !is CoroutineScheduler.Worker) throw IllegalStateException("Expected CoroutineScheduler.Worker, but got $thread")
107     return thread.runSingleTask()
108 }
109 
110 /**
111  * Checks whether the given thread belongs to Dispatchers.IO.
112  * Note that feature "is part of the Dispatchers.IO" is *dynamic*, meaning that the thread
113  * may change this status when switching between tasks.
114  *
115  * This function is inteded to be used on the result of `Thread.currentThread()` for diagnostic
116  * purposes, and is declared as an extension only to avoid top-level scope pollution.
117  */
118 @InternalCoroutinesApi
119 @DelicateCoroutinesApi
120 @PublishedApi
isIoDispatcherThreadnull121 internal fun Thread.isIoDispatcherThread(): Boolean {
122     if (this !is CoroutineScheduler.Worker) return false
123     return isIo()
124 }
125 
126