1 /*
2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines
6
7 import kotlinx.coroutines.internal.*
8 import java.util.concurrent.*
9 import kotlin.coroutines.*
10
11 private val defaultMainDelayOptIn = systemProp("kotlinx.coroutines.main.delay", false)
12
13 internal actual val DefaultDelay: Delay = initializeDefaultDelay()
14
initializeDefaultDelaynull15 private fun initializeDefaultDelay(): Delay {
16 // Opt-out flag
17 if (!defaultMainDelayOptIn) return DefaultExecutor
18 val main = Dispatchers.Main
19 /*
20 * When we already are working with UI and Main threads, it makes
21 * no sense to create a separate thread with timer that cannot be controller
22 * by the UI runtime.
23 */
24 return if (main.isMissing() || main !is Delay) DefaultExecutor else main
25 }
26
27 @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
28 internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
29 const val THREAD_NAME = "kotlinx.coroutines.DefaultExecutor"
30
31 init {
32 incrementUseCount() // this event loop is never completed
33 }
34
35 private const val DEFAULT_KEEP_ALIVE_MS = 1000L // in milliseconds
36
37 private val KEEP_ALIVE_NANOS = TimeUnit.MILLISECONDS.toNanos(
38 try {
39 java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE_MS)
40 } catch (e: SecurityException) {
41 DEFAULT_KEEP_ALIVE_MS
42 })
43
44 @Suppress("ObjectPropertyName")
45 @Volatile
46 private var _thread: Thread? = null
47
48 override val thread: Thread
49 get() = _thread ?: createThreadSync()
50
51 private const val FRESH = 0
52 private const val ACTIVE = 1
53 private const val SHUTDOWN_REQ = 2
54 private const val SHUTDOWN_ACK = 3
55 private const val SHUTDOWN = 4
56
57 @Volatile
58 private var debugStatus: Int = FRESH
59
60 private val isShutDown: Boolean get() = debugStatus == SHUTDOWN
61
62 private val isShutdownRequested: Boolean get() {
63 val debugStatus = debugStatus
64 return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK
65 }
66
enqueuenull67 actual override fun enqueue(task: Runnable) {
68 if (isShutDown) shutdownError()
69 super.enqueue(task)
70 }
71
reschedulenull72 override fun reschedule(now: Long, delayedTask: DelayedTask) {
73 // Reschedule on default executor can only be invoked after Dispatchers.shutdown
74 shutdownError()
75 }
76
shutdownErrornull77 private fun shutdownError() {
78 throw RejectedExecutionException("DefaultExecutor was shut down. " +
79 "This error indicates that Dispatchers.shutdown() was invoked prior to completion of exiting coroutines, leaving coroutines in incomplete state. " +
80 "Please refer to Dispatchers.shutdown documentation for more details")
81 }
82
shutdownnull83 override fun shutdown() {
84 debugStatus = SHUTDOWN
85 super.shutdown()
86 }
87
88 /**
89 * All event loops are using DefaultExecutor#invokeOnTimeout to avoid livelock on
90 * ```
91 * runBlocking(eventLoop) { withTimeout { while(isActive) { ... } } }
92 * ```
93 *
94 * Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]),
95 * but it's not exposed as public API.
96 */
invokeOnTimeoutnull97 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
98 scheduleInvokeOnTimeout(timeMillis, block)
99
100 override fun run() {
101 ThreadLocalEventLoop.setEventLoop(this)
102 registerTimeLoopThread()
103 try {
104 var shutdownNanos = Long.MAX_VALUE
105 if (!notifyStartup()) return
106 while (true) {
107 Thread.interrupted() // just reset interruption flag
108 var parkNanos = processNextEvent()
109 if (parkNanos == Long.MAX_VALUE) {
110 // nothing to do, initialize shutdown timeout
111 val now = nanoTime()
112 if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
113 val tillShutdown = shutdownNanos - now
114 if (tillShutdown <= 0) return // shut thread down
115 parkNanos = parkNanos.coerceAtMost(tillShutdown)
116 } else
117 shutdownNanos = Long.MAX_VALUE
118 if (parkNanos > 0) {
119 // check if shutdown was requested and bail out in this case
120 if (isShutdownRequested) return
121 parkNanos(this, parkNanos)
122 }
123 }
124 } finally {
125 _thread = null // this thread is dead
126 acknowledgeShutdownIfNeeded()
127 unregisterTimeLoopThread()
128 // recheck if queues are empty after _thread reference was set to null (!!!)
129 if (!isEmpty) thread // recreate thread if it is needed
130 }
131 }
132
133 @Synchronized
createThreadSyncnull134 private fun createThreadSync(): Thread {
135 return _thread ?: Thread(this, THREAD_NAME).apply {
136 _thread = this
137 isDaemon = true
138 start()
139 }
140 }
141
142 // used for tests
143 @Synchronized
ensureStartednull144 internal fun ensureStarted() {
145 assert { _thread == null } // ensure we are at a clean state
146 assert { debugStatus == FRESH || debugStatus == SHUTDOWN_ACK }
147 debugStatus = FRESH
148 createThreadSync() // create fresh thread
149 while (debugStatus == FRESH) (this as Object).wait()
150 }
151
152 @Synchronized
notifyStartupnull153 private fun notifyStartup(): Boolean {
154 if (isShutdownRequested) return false
155 debugStatus = ACTIVE
156 (this as Object).notifyAll()
157 return true
158 }
159
160 @Synchronized // used _only_ for tests
shutdownForTestsnull161 fun shutdownForTests(timeout: Long) {
162 val deadline = System.currentTimeMillis() + timeout
163 if (!isShutdownRequested) debugStatus = SHUTDOWN_REQ
164 // loop while there is anything to do immediately or deadline passes
165 while (debugStatus != SHUTDOWN_ACK && _thread != null) {
166 _thread?.let { unpark(it) } // wake up thread if present
167 val remaining = deadline - System.currentTimeMillis()
168 if (remaining <= 0) break
169 (this as Object).wait(timeout)
170 }
171 // restore fresh status
172 debugStatus = FRESH
173 }
174
175 @Synchronized
acknowledgeShutdownIfNeedednull176 private fun acknowledgeShutdownIfNeeded() {
177 if (!isShutdownRequested) return
178 debugStatus = SHUTDOWN_ACK
179 resetAll() // clear queues
180 (this as Object).notifyAll()
181 }
182
183 internal val isThreadPresent
184 get() = _thread != null
185 }
186