• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.coroutines.internal.*
8 import java.util.concurrent.*
9 import kotlin.coroutines.*
10 
11 private val defaultMainDelayOptIn = systemProp("kotlinx.coroutines.main.delay", false)
12 
13 internal actual val DefaultDelay: Delay = initializeDefaultDelay()
14 
initializeDefaultDelaynull15 private fun initializeDefaultDelay(): Delay {
16     // Opt-out flag
17     if (!defaultMainDelayOptIn) return DefaultExecutor
18     val main = Dispatchers.Main
19     /*
20      * When we already are working with UI and Main threads, it makes
21      * no sense to create a separate thread with timer that cannot be controller
22      * by the UI runtime.
23      */
24     return if (main.isMissing() || main !is Delay) DefaultExecutor else main
25 }
26 
27 @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
28 internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
29     const val THREAD_NAME = "kotlinx.coroutines.DefaultExecutor"
30 
31     init {
32         incrementUseCount() // this event loop is never completed
33     }
34 
35     private const val DEFAULT_KEEP_ALIVE_MS = 1000L // in milliseconds
36 
37     private val KEEP_ALIVE_NANOS = TimeUnit.MILLISECONDS.toNanos(
38         try {
39             java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE_MS)
40         } catch (e: SecurityException) {
41             DEFAULT_KEEP_ALIVE_MS
42         })
43 
44     @Suppress("ObjectPropertyName")
45     @Volatile
46     private var _thread: Thread? = null
47 
48     override val thread: Thread
49         get() = _thread ?: createThreadSync()
50 
51     private const val FRESH = 0
52     private const val ACTIVE = 1
53     private const val SHUTDOWN_REQ = 2
54     private const val SHUTDOWN_ACK = 3
55     private const val SHUTDOWN = 4
56 
57     @Volatile
58     private var debugStatus: Int = FRESH
59 
60     private val isShutDown: Boolean get() = debugStatus == SHUTDOWN
61 
62     private val isShutdownRequested: Boolean get() {
63         val debugStatus = debugStatus
64         return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK
65     }
66 
enqueuenull67     actual override fun enqueue(task: Runnable) {
68         if (isShutDown) shutdownError()
69         super.enqueue(task)
70     }
71 
reschedulenull72      override fun reschedule(now: Long, delayedTask: DelayedTask) {
73          // Reschedule on default executor can only be invoked after Dispatchers.shutdown
74          shutdownError()
75     }
76 
shutdownErrornull77     private fun shutdownError() {
78         throw RejectedExecutionException("DefaultExecutor was shut down. " +
79             "This error indicates that Dispatchers.shutdown() was invoked prior to completion of exiting coroutines, leaving coroutines in incomplete state. " +
80             "Please refer to Dispatchers.shutdown documentation for more details")
81     }
82 
shutdownnull83     override fun shutdown() {
84         debugStatus = SHUTDOWN
85         super.shutdown()
86     }
87 
88     /**
89      * All event loops are using DefaultExecutor#invokeOnTimeout to avoid livelock on
90      * ```
91      * runBlocking(eventLoop) { withTimeout { while(isActive) { ... } } }
92      * ```
93      *
94      * Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]),
95      * but it's not exposed as public API.
96      */
invokeOnTimeoutnull97     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
98         scheduleInvokeOnTimeout(timeMillis, block)
99 
100     override fun run() {
101         ThreadLocalEventLoop.setEventLoop(this)
102         registerTimeLoopThread()
103         try {
104             var shutdownNanos = Long.MAX_VALUE
105             if (!notifyStartup()) return
106             while (true) {
107                 Thread.interrupted() // just reset interruption flag
108                 var parkNanos = processNextEvent()
109                 if (parkNanos == Long.MAX_VALUE) {
110                     // nothing to do, initialize shutdown timeout
111                     val now = nanoTime()
112                     if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
113                     val tillShutdown = shutdownNanos - now
114                     if (tillShutdown <= 0) return // shut thread down
115                     parkNanos = parkNanos.coerceAtMost(tillShutdown)
116                 } else
117                     shutdownNanos = Long.MAX_VALUE
118                 if (parkNanos > 0) {
119                     // check if shutdown was requested and bail out in this case
120                     if (isShutdownRequested) return
121                     parkNanos(this, parkNanos)
122                 }
123             }
124         } finally {
125             _thread = null // this thread is dead
126             acknowledgeShutdownIfNeeded()
127             unregisterTimeLoopThread()
128             // recheck if queues are empty after _thread reference was set to null (!!!)
129             if (!isEmpty) thread // recreate thread if it is needed
130         }
131     }
132 
133     @Synchronized
createThreadSyncnull134     private fun createThreadSync(): Thread {
135         return _thread ?: Thread(this, THREAD_NAME).apply {
136             _thread = this
137             isDaemon = true
138             start()
139         }
140     }
141 
142     // used for tests
143     @Synchronized
ensureStartednull144     internal fun ensureStarted() {
145         assert { _thread == null } // ensure we are at a clean state
146         assert { debugStatus == FRESH || debugStatus == SHUTDOWN_ACK }
147         debugStatus = FRESH
148         createThreadSync() // create fresh thread
149         while (debugStatus == FRESH) (this as Object).wait()
150     }
151 
152     @Synchronized
notifyStartupnull153     private fun notifyStartup(): Boolean {
154         if (isShutdownRequested) return false
155         debugStatus = ACTIVE
156         (this as Object).notifyAll()
157         return true
158     }
159 
160     @Synchronized // used _only_ for tests
shutdownForTestsnull161     fun shutdownForTests(timeout: Long) {
162         val deadline = System.currentTimeMillis() + timeout
163         if (!isShutdownRequested) debugStatus = SHUTDOWN_REQ
164         // loop while there is anything to do immediately or deadline passes
165         while (debugStatus != SHUTDOWN_ACK && _thread != null) {
166             _thread?.let { unpark(it) } // wake up thread if present
167             val remaining = deadline - System.currentTimeMillis()
168             if (remaining <= 0) break
169             (this as Object).wait(timeout)
170         }
171         // restore fresh status
172         debugStatus = FRESH
173     }
174 
175     @Synchronized
acknowledgeShutdownIfNeedednull176     private fun acknowledgeShutdownIfNeeded() {
177         if (!isShutdownRequested) return
178         debugStatus = SHUTDOWN_ACK
179         resetAll() // clear queues
180         (this as Object).notifyAll()
181     }
182 
183     internal val isThreadPresent
184         get() = _thread != null
185 }
186