• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.coroutines.Runnable
8 import kotlinx.coroutines.scheduling.*
9 import kotlinx.coroutines.scheduling.CoroutineScheduler
10 
11 internal actual abstract class EventLoopImplPlatform: EventLoop() {
12     protected abstract val thread: Thread
13 
unparknull14     protected actual fun unpark() {
15         val thread = thread // atomic read
16         if (Thread.currentThread() !== thread)
17             unpark(thread)
18     }
19 
reschedulenull20     protected actual open fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) {
21         DefaultExecutor.schedule(now, delayedTask)
22     }
23 }
24 
25 internal class BlockingEventLoop(
26     override val thread: Thread
27 ) : EventLoopImplBase()
28 
createEventLoopnull29 internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread())
30 
31 /**
32  * Processes next event in the current thread's event loop.
33  *
34  * The result of this function is to be interpreted like this:
35  * * `<= 0` -- there are potentially more events for immediate processing;
36  * * `> 0` -- a number of nanoseconds to wait for the next scheduled event;
37  * * [Long.MAX_VALUE] -- no more events or no thread-local event loop.
38  *
39  * Sample usage of this function:
40  *
41  * ```
42  * while (waitingCondition) {
43  *     val time = processNextEventInCurrentThread()
44  *     LockSupport.parkNanos(time)
45  * }
46  * ```
47  *
48  * @suppress **This an internal API and should not be used from general code.**
49  */
50 @InternalCoroutinesApi
51 public fun processNextEventInCurrentThread(): Long =
52     // This API is used in Ktor for serverless integration where a single thread awaits a blocking call
53     // (and, to avoid actual blocking, does something via this call), see #850
54     ThreadLocalEventLoop.currentOrNull()?.processNextEvent() ?: Long.MAX_VALUE
55 
56 internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block()
57 
58 /**
59  * Retrieves and executes a single task from the current system dispatcher ([Dispatchers.Default] or [Dispatchers.IO]).
60  * Returns `0` if any task was executed, `>= 0` for number of nanoseconds to wait until invoking this method again
61  * (implying that there will be a task to steal in N nanoseconds), `-1` if there is no tasks in the corresponding dispatcher at all.
62  *
63  * ### Invariants
64  *
65  *  - When invoked from [Dispatchers.Default] **thread** (even if the actual context is different dispatcher,
66  *    [CoroutineDispatcher.limitedParallelism] or any in-place wrapper), it runs an arbitrary task that ended
67  *    up being scheduled to [Dispatchers.Default] or its counterpart. Tasks scheduled to [Dispatchers.IO]
68  *    **are not** executed[1].
69  *  - When invoked from [Dispatchers.IO] thread, the same rules apply, but for blocking tasks only.
70  *
71  * [1] -- this is purely technical limitation: the scheduler does not have "notify me when CPU token is available" API,
72  * and we cannot leave this method without leaving thread in its original state.
73  *
74  * ### Rationale
75  *
76  * This is an internal API that is intended to replace IDEA's core FJP decomposition.
77  * The following API is provided by IDEA core:
78  * ```
79  * runDecomposedTaskAndJoinIt { // <- non-suspending call
80  *     // spawn as many tasks as needed
81  *     // these tasks can also invoke 'runDecomposedTaskAndJoinIt'
82  * }
83  * ```
84  * The key observation here is that 'runDecomposedTaskAndJoinIt' can be invoked from `Dispatchers.Default` itself,
85  * thus blocking at least one thread. To avoid deadlocks and starvation during large hierarchical decompositions,
86  * 'runDecomposedTaskAndJoinIt' should not just block but also **help** execute the task or other tasks
87  * until an arbitrary condition is satisfied.
88  *
89  * See #3439 for additional details.
90  *
91  * ### Limitations and caveats
92  *
93  * - Executes tasks in-place, thus potentially leaking irrelevant thread-locals from the current thread
94  * - Is not 100% effective, because the caller should somehow "wait" (or do other work) for [Long] returned nanoseconds
95  *   even when work arrives immediately after returning from this method.
96  * - When there is no more work, it's up to the caller to decide what to do. It's important to remember that
97  *   work to current dispatcher may arrive **later** from external sources [1]
98  *
99  * [1] -- this is also a technicality that can be solved in kotlinx.coroutines itself, but unfortunately requires
100  *        a tremendous effort.
101  *
102  * @throws IllegalStateException if the current thread is not system dispatcher thread
103  */
104 @InternalCoroutinesApi
105 @DelicateCoroutinesApi
106 @PublishedApi
107 internal fun runSingleTaskFromCurrentSystemDispatcher(): Long {
108     val thread = Thread.currentThread()
109     if (thread !is CoroutineScheduler.Worker) throw IllegalStateException("Expected CoroutineScheduler.Worker, but got $thread")
110     return thread.runSingleTask()
111 }
112 
113 /**
114  * Checks whether the given thread belongs to Dispatchers.IO.
115  * Note that feature "is part of the Dispatchers.IO" is *dynamic*, meaning that the thread
116  * may change this status when switching between tasks.
117  *
118  * This function is inteded to be used on the result of `Thread.currentThread()` for diagnostic
119  * purposes, and is declared as an extension only to avoid top-level scope pollution.
120  */
121 @InternalCoroutinesApi
122 @DelicateCoroutinesApi
123 @PublishedApi
isIoDispatcherThreadnull124 internal fun Thread.isIoDispatcherThread(): Boolean {
125     if (this !is CoroutineScheduler.Worker) return false
126     return isIo()
127 }
128 
129