1 /*
<lambda>null2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines
6
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.internal.*
9 import kotlin.coroutines.*
10 import kotlin.jvm.*
11
12 /**
13 * Extended by [CoroutineDispatcher] implementations that have event loop inside and can
14 * be asked to process next event from their event queue.
15 *
16 * It may optionally implement [Delay] interface and support time-scheduled tasks.
17 * It is created or pigged back onto (see [ThreadLocalEventLoop])
18 * by `runBlocking` and by [Dispatchers.Unconfined].
19 *
20 * @suppress **This an internal API and should not be used from general code.**
21 */
22 internal abstract class EventLoop : CoroutineDispatcher() {
23 /**
24 * Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop.
25 */
26 private var useCount = 0L
27
28 /**
29 * Set to true on any use by `runBlocking`, because it potentially leaks this loop to other threads, so
30 * this instance must be properly shutdown. We don't need to shutdown event loop that was used solely
31 * by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time.
32 */
33 private var shared = false
34
35 /**
36 * Queue used by [Dispatchers.Unconfined] tasks.
37 * These tasks are thread-local for performance and take precedence over the rest of the queue.
38 */
39 private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null
40
41 /**
42 * Processes next event in this event loop.
43 *
44 * The result of this function is to be interpreted like this:
45 * * `<= 0` -- there are potentially more events for immediate processing;
46 * * `> 0` -- a number of nanoseconds to wait for next scheduled event;
47 * * [Long.MAX_VALUE] -- no more events.
48 *
49 * **NOTE**: Must be invoked only from the event loop's thread
50 * (no check for performance reasons, may be added in the future).
51 */
52 public open fun processNextEvent(): Long {
53 if (!processUnconfinedEvent()) return Long.MAX_VALUE
54 return nextTime
55 }
56
57 protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
58
59 protected open val nextTime: Long
60 get() {
61 val queue = unconfinedQueue ?: return Long.MAX_VALUE
62 return if (queue.isEmpty) Long.MAX_VALUE else 0L
63 }
64
65 public fun processUnconfinedEvent(): Boolean {
66 val queue = unconfinedQueue ?: return false
67 val task = queue.removeFirstOrNull() ?: return false
68 task.run()
69 return true
70 }
71 /**
72 * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context
73 * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one).
74 * By default, event loop implementation is thread-local and should not processed in the context
75 * (current thread's event loop should be processed instead).
76 */
77 public open fun shouldBeProcessedFromContext(): Boolean = false
78
79 /**
80 * Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
81 * into the current event loop.
82 */
83 public fun dispatchUnconfined(task: DispatchedTask<*>) {
84 val queue = unconfinedQueue ?:
85 ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it }
86 queue.addLast(task)
87 }
88
89 public val isActive: Boolean
90 get() = useCount > 0
91
92 public val isUnconfinedLoopActive: Boolean
93 get() = useCount >= delta(unconfined = true)
94
95 // May only be used from the event loop's thread
96 public val isUnconfinedQueueEmpty: Boolean
97 get() = unconfinedQueue?.isEmpty ?: true
98
99 private fun delta(unconfined: Boolean) =
100 if (unconfined) (1L shl 32) else 1L
101
102 fun incrementUseCount(unconfined: Boolean = false) {
103 useCount += delta(unconfined)
104 if (!unconfined) shared = true
105 }
106
107 fun decrementUseCount(unconfined: Boolean = false) {
108 useCount -= delta(unconfined)
109 if (useCount > 0) return
110 assert { useCount == 0L } // "Extra decrementUseCount"
111 if (shared) {
112 // shut it down and remove from ThreadLocalEventLoop
113 shutdown()
114 }
115 }
116
117 protected open fun shutdown() {}
118 }
119
120 @NativeThreadLocal
121 internal object ThreadLocalEventLoop {
122 private val ref = CommonThreadLocal<EventLoop?>()
123
124 internal val eventLoop: EventLoop
<lambda>null125 get() = ref.get() ?: createEventLoop().also { ref.set(it) }
126
currentOrNullnull127 internal fun currentOrNull(): EventLoop? =
128 ref.get()
129
130 internal fun resetEventLoop() {
131 ref.set(null)
132 }
133
setEventLoopnull134 internal fun setEventLoop(eventLoop: EventLoop) {
135 ref.set(eventLoop)
136 }
137 }
138
139 @SharedImmutable
140 private val DISPOSED_TASK = Symbol("REMOVED_TASK")
141
142 // results for scheduleImpl
143 private const val SCHEDULE_OK = 0
144 private const val SCHEDULE_COMPLETED = 1
145 private const val SCHEDULE_DISPOSED = 2
146
147 private const val MS_TO_NS = 1_000_000L
148 private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS
149
150 /**
151 * First-line overflow protection -- limit maximal delay.
152 * Delays longer than this one (~146 years) are considered to be delayed "forever".
153 */
154 private const val MAX_DELAY_NS = Long.MAX_VALUE / 2
155
delayToNanosnull156 internal fun delayToNanos(timeMillis: Long): Long = when {
157 timeMillis <= 0 -> 0L
158 timeMillis >= MAX_MS -> Long.MAX_VALUE
159 else -> timeMillis * MS_TO_NS
160 }
161
delayNanosToMillisnull162 internal fun delayNanosToMillis(timeNanos: Long): Long =
163 timeNanos / MS_TO_NS
164
165 @SharedImmutable
166 private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY")
167
168 private typealias Queue<T> = LockFreeTaskQueueCore<T>
169
170 internal expect abstract class EventLoopImplPlatform() : EventLoop {
171 // Called to unpark this event loop's thread
172 protected fun unpark()
173
174 // Called to reschedule to DefaultExecutor when this event loop is complete
175 protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask)
176 }
177
178 internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
179 // null | CLOSED_EMPTY | task | Queue<Runnable>
180 private val _queue = atomic<Any?>(null)
181
182 // Allocated only only once
183 private val _delayed = atomic<DelayedTaskQueue?>(null)
184
185 @Volatile
186 private var isCompleted = false
187
188 override val isEmpty: Boolean get() {
189 if (!isUnconfinedQueueEmpty) return false
190 val delayed = _delayed.value
191 if (delayed != null && !delayed.isEmpty) return false
192 val queue = _queue.value
193 return when (queue) {
194 null -> true
195 is Queue<*> -> queue.isEmpty
196 else -> queue === CLOSED_EMPTY
197 }
198 }
199
200 protected override val nextTime: Long
201 get() {
202 if (super.nextTime == 0L) return 0L
203 val queue = _queue.value
204 when {
205 queue === null -> {} // empty queue -- proceed
206 queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
207 queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
208 else -> return 0 // non-empty queue
209 }
210 val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE
211 return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)
212 }
213
shutdownnull214 override fun shutdown() {
215 // Clean up thread-local reference here -- this event loop is shutting down
216 ThreadLocalEventLoop.resetEventLoop()
217 // We should signal that this event loop should not accept any more tasks
218 // and process queued events (that could have been added after last processNextEvent)
219 isCompleted = true
220 closeQueue()
221 // complete processing of all queued tasks
222 while (processNextEvent() <= 0) { /* spin */ }
223 // reschedule the rest of delayed tasks
224 rescheduleAllDelayed()
225 }
226
scheduleResumeAfterDelaynull227 public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
228 val timeNanos = delayToNanos(timeMillis)
229 if (timeNanos < MAX_DELAY_NS) {
230 val now = nanoTime()
231 DelayedResumeTask(now + timeNanos, continuation).also { task ->
232 continuation.disposeOnCancellation(task)
233 schedule(now, task)
234 }
235 }
236 }
237
scheduleInvokeOnTimeoutnull238 protected fun scheduleInvokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
239 val timeNanos = delayToNanos(timeMillis)
240 return if (timeNanos < MAX_DELAY_NS) {
241 val now = nanoTime()
242 DelayedRunnableTask(now + timeNanos, block).also { task ->
243 schedule(now, task)
244 }
245 } else {
246 NonDisposableHandle
247 }
248 }
249
processNextEventnull250 override fun processNextEvent(): Long {
251 // unconfined events take priority
252 if (processUnconfinedEvent()) return nextTime
253 // queue all delayed tasks that are due to be executed
254 val delayed = _delayed.value
255 if (delayed != null && !delayed.isEmpty) {
256 val now = nanoTime()
257 while (true) {
258 // make sure that moving from delayed to queue removes from delayed only after it is added to queue
259 // to make sure that 'isEmpty' and `nextTime` that check both of them
260 // do not transiently report that both delayed and queue are empty during move
261 delayed.removeFirstIf {
262 if (it.timeToExecute(now)) {
263 enqueueImpl(it)
264 } else
265 false
266 } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
267 }
268 }
269 // then process one event from queue
270 dequeue()?.run()
271 return nextTime
272 }
273
dispatchnull274 public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
275
276 public fun enqueue(task: Runnable) {
277 if (enqueueImpl(task)) {
278 // todo: we should unpark only when this delayed task became first in the queue
279 unpark()
280 } else {
281 DefaultExecutor.enqueue(task)
282 }
283 }
284
285 @Suppress("UNCHECKED_CAST")
enqueueImplnull286 private fun enqueueImpl(task: Runnable): Boolean {
287 _queue.loop { queue ->
288 if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
289 when (queue) {
290 null -> if (_queue.compareAndSet(null, task)) return true
291 is Queue<*> -> {
292 when ((queue as Queue<Runnable>).addLast(task)) {
293 Queue.ADD_SUCCESS -> return true
294 Queue.ADD_CLOSED -> return false
295 Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
296 }
297 }
298 else -> when {
299 queue === CLOSED_EMPTY -> return false
300 else -> {
301 // update to full-blown queue to add one more
302 val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
303 newQueue.addLast(queue as Runnable)
304 newQueue.addLast(task)
305 if (_queue.compareAndSet(queue, newQueue)) return true
306 }
307 }
308 }
309 }
310 }
311
312 @Suppress("UNCHECKED_CAST")
dequeuenull313 private fun dequeue(): Runnable? {
314 _queue.loop { queue ->
315 when (queue) {
316 null -> return null
317 is Queue<*> -> {
318 val result = (queue as Queue<Runnable>).removeFirstOrNull()
319 if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
320 _queue.compareAndSet(queue, queue.next())
321 }
322 else -> when {
323 queue === CLOSED_EMPTY -> return null
324 else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable
325 }
326 }
327 }
328 }
329
closeQueuenull330 private fun closeQueue() {
331 assert { isCompleted }
332 _queue.loop { queue ->
333 when (queue) {
334 null -> if (_queue.compareAndSet(null, CLOSED_EMPTY)) return
335 is Queue<*> -> {
336 queue.close()
337 return
338 }
339 else -> when {
340 queue === CLOSED_EMPTY -> return
341 else -> {
342 // update to full-blown queue to close
343 val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
344 newQueue.addLast(queue as Runnable)
345 if (_queue.compareAndSet(queue, newQueue)) return
346 }
347 }
348 }
349 }
350
351 }
352
schedulenull353 public fun schedule(now: Long, delayedTask: DelayedTask) {
354 when (scheduleImpl(now, delayedTask)) {
355 SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
356 SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
357 SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
358 else -> error("unexpected result")
359 }
360 }
361
shouldUnparknull362 private fun shouldUnpark(task: DelayedTask): Boolean = _delayed.value?.peek() === task
363
364 private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
365 if (isCompleted) return SCHEDULE_COMPLETED
366 val delayedQueue = _delayed.value ?: run {
367 _delayed.compareAndSet(null, DelayedTaskQueue(now))
368 _delayed.value!!
369 }
370 return delayedTask.scheduleTask(now, delayedQueue, this)
371 }
372
373 // It performs "hard" shutdown for test cleanup purposes
resetAllnull374 protected fun resetAll() {
375 _queue.value = null
376 _delayed.value = null
377 }
378
379 // This is a "soft" (normal) shutdown
rescheduleAllDelayednull380 private fun rescheduleAllDelayed() {
381 val now = nanoTime()
382 while (true) {
383 /*
384 * `removeFirstOrNull` below is the only operation on DelayedTask & ThreadSafeHeap that is not
385 * synchronized on DelayedTask itself. All other operation are synchronized both on
386 * DelayedTask & ThreadSafeHeap instances (in this order). It is still safe, because `dispose`
387 * first removes DelayedTask from the heap (under synchronization) then
388 * assign "_heap = DISPOSED_TASK", so there cannot be ever a race to _heap reference update.
389 */
390 val delayedTask = _delayed.value?.removeFirstOrNull() ?: break
391 reschedule(now, delayedTask)
392 }
393 }
394
395 internal abstract class DelayedTask(
396 /**
397 * This field can be only modified in [scheduleTask] before putting this DelayedTask
398 * into heap to avoid overflow and corruption of heap data structure.
399 */
400 @JvmField var nanoTime: Long
401 ) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
402 private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK
403
404 override var heap: ThreadSafeHeap<*>?
405 get() = _heap as? ThreadSafeHeap<*>
406 set(value) {
407 require(_heap !== DISPOSED_TASK) // this can never happen, it is always checked before adding/removing
408 _heap = value
409 }
410
411 override var index: Int = -1
412
compareTonull413 override fun compareTo(other: DelayedTask): Int {
414 val dTime = nanoTime - other.nanoTime
415 return when {
416 dTime > 0 -> 1
417 dTime < 0 -> -1
418 else -> 0
419 }
420 }
421
timeToExecutenull422 fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L
423
424 @Synchronized
425 fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int {
426 if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed
427 delayed.addLastIf(this) { firstTask ->
428 if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask
429 /**
430 * We are about to add new task and we have to make sure that [DelayedTaskQueue]
431 * invariant is maintained. The code in this lambda is additionally executed under
432 * the lock of [DelayedTaskQueue] and working with [DelayedTaskQueue.timeNow] here is thread-safe.
433 */
434 if (firstTask == null) {
435 /**
436 * When adding the first delayed task we simply update queue's [DelayedTaskQueue.timeNow] to
437 * the current now time even if that means "going backwards in time". This makes the structure
438 * self-correcting in spite of wild jumps in `nanoTime()` measurements once all delayed tasks
439 * are removed from the delayed queue for execution.
440 */
441 delayed.timeNow = now
442 } else {
443 /**
444 * Carefully update [DelayedTaskQueue.timeNow] so that it does not sweep past first's tasks time
445 * and only goes forward in time. We cannot let it go backwards in time or invariant can be
446 * violated for tasks that were already scheduled.
447 */
448 val firstTime = firstTask.nanoTime
449 // compute min(now, firstTime) using a wrap-safe check
450 val minTime = if (firstTime - now >= 0) now else firstTime
451 // update timeNow only when going forward in time
452 if (minTime - delayed.timeNow > 0) delayed.timeNow = minTime
453 }
454 /**
455 * Here [DelayedTaskQueue.timeNow] was already modified and we have to double-check that newly added
456 * task does not violate [DelayedTaskQueue] invariant because of that. Note also that this scheduleTask
457 * function can be called to reschedule from one queue to another and this might be another reason
458 * where new task's time might now violate invariant.
459 * We correct invariant violation (if any) by simply changing this task's time to now.
460 */
461 if (nanoTime - delayed.timeNow < 0) nanoTime = delayed.timeNow
462 true
463 }
464 return SCHEDULE_OK
465 }
466
467 @Synchronized
disposenull468 final override fun dispose() {
469 val heap = _heap
470 if (heap === DISPOSED_TASK) return // already disposed
471 @Suppress("UNCHECKED_CAST")
472 (heap as? DelayedTaskQueue)?.remove(this) // remove if it is in heap (first)
473 _heap = DISPOSED_TASK // never add again to any heap
474 }
475
toStringnull476 override fun toString(): String = "Delayed[nanos=$nanoTime]"
477 }
478
479 private inner class DelayedResumeTask(
480 nanoTime: Long,
481 private val cont: CancellableContinuation<Unit>
482 ) : DelayedTask(nanoTime) {
483 override fun run() { with(cont) { resumeUndispatched(Unit) } }
484 override fun toString(): String = super.toString() + cont.toString()
485 }
486
487 private class DelayedRunnableTask(
488 nanoTime: Long,
489 private val block: Runnable
490 ) : DelayedTask(nanoTime) {
runnull491 override fun run() { block.run() }
toStringnull492 override fun toString(): String = super.toString() + block.toString()
493 }
494
495 /**
496 * Delayed task queue maintains stable time-comparision invariant despite potential wraparounds in
497 * long nano time measurements by maintaining last observed [timeNow]. It protects the integrity of the
498 * heap data structure in spite of potential non-monotonicity of `nanoTime()` source.
499 * The invariant is that for every scheduled [DelayedTask]:
500 *
501 * ```
502 * delayedTask.nanoTime - timeNow >= 0
503 * ```
504 *
505 * So the comparison of scheduled tasks via [DelayedTask.compareTo] is always stable as
506 * scheduled [DelayedTask.nanoTime] can be at most [Long.MAX_VALUE] apart. This invariant is maintained when
507 * new tasks are added by [DelayedTask.scheduleTask] function and it cannot be violated when tasks are removed
508 * (so there is nothing special to do there).
509 */
510 internal class DelayedTaskQueue(
511 @JvmField var timeNow: Long
512 ) : ThreadSafeHeap<DelayedTask>()
513 }
514
515 internal expect fun createEventLoop(): EventLoop
516
517 internal expect fun nanoTime(): Long
518
519 internal expect object DefaultExecutor {
520 public fun enqueue(task: Runnable)
521 }
522
523