1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines
6
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.internal.*
9 import kotlin.coroutines.*
10 import kotlin.jvm.*
11 import kotlin.native.concurrent.*
12
13 /**
14 * Extended by [CoroutineDispatcher] implementations that have event loop inside and can
15 * be asked to process next event from their event queue.
16 *
17 * It may optionally implement [Delay] interface and support time-scheduled tasks.
18 * It is created or pigged back onto (see [ThreadLocalEventLoop])
19 * by `runBlocking` and by [Dispatchers.Unconfined].
20 *
21 * @suppress **This an internal API and should not be used from general code.**
22 */
23 internal abstract class EventLoop : CoroutineDispatcher() {
24 /**
25 * Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop.
26 */
27 private var useCount = 0L
28
29 /**
30 * Set to true on any use by `runBlocking`, because it potentially leaks this loop to other threads, so
31 * this instance must be properly shutdown. We don't need to shutdown event loop that was used solely
32 * by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time.
33 */
34 private var shared = false
35
36 /**
37 * Queue used by [Dispatchers.Unconfined] tasks.
38 * These tasks are thread-local for performance and take precedence over the rest of the queue.
39 */
40 private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null
41
42 /**
43 * Processes next event in this event loop.
44 *
45 * The result of this function is to be interpreted like this:
46 * * `<= 0` -- there are potentially more events for immediate processing;
47 * * `> 0` -- a number of nanoseconds to wait for next scheduled event;
48 * * [Long.MAX_VALUE] -- no more events.
49 *
50 * **NOTE**: Must be invoked only from the event loop's thread
51 * (no check for performance reasons, may be added in the future).
52 */
53 public open fun processNextEvent(): Long {
54 if (!processUnconfinedEvent()) return Long.MAX_VALUE
55 return 0
56 }
57
58 protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
59
60 protected open val nextTime: Long
61 get() {
62 val queue = unconfinedQueue ?: return Long.MAX_VALUE
63 return if (queue.isEmpty) Long.MAX_VALUE else 0L
64 }
65
66 public fun processUnconfinedEvent(): Boolean {
67 val queue = unconfinedQueue ?: return false
68 val task = queue.removeFirstOrNull() ?: return false
69 task.run()
70 return true
71 }
72 /**
73 * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context
74 * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one).
75 * By default, event loop implementation is thread-local and should not processed in the context
76 * (current thread's event loop should be processed instead).
77 */
78 public open fun shouldBeProcessedFromContext(): Boolean = false
79
80 /**
81 * Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
82 * into the current event loop.
83 */
84 public fun dispatchUnconfined(task: DispatchedTask<*>) {
85 val queue = unconfinedQueue ?:
86 ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it }
87 queue.addLast(task)
88 }
89
90 public val isActive: Boolean
91 get() = useCount > 0
92
93 public val isUnconfinedLoopActive: Boolean
94 get() = useCount >= delta(unconfined = true)
95
96 // May only be used from the event loop's thread
97 public val isUnconfinedQueueEmpty: Boolean
98 get() = unconfinedQueue?.isEmpty ?: true
99
100 private fun delta(unconfined: Boolean) =
101 if (unconfined) (1L shl 32) else 1L
102
103 fun incrementUseCount(unconfined: Boolean = false) {
104 useCount += delta(unconfined)
105 if (!unconfined) shared = true
106 }
107
108 fun decrementUseCount(unconfined: Boolean = false) {
109 useCount -= delta(unconfined)
110 if (useCount > 0) return
111 assert { useCount == 0L } // "Extra decrementUseCount"
112 if (shared) {
113 // shut it down and remove from ThreadLocalEventLoop
114 shutdown()
115 }
116 }
117
118 protected open fun shutdown() {}
119 }
120
121 @ThreadLocal
122 internal object ThreadLocalEventLoop {
123 private val ref = CommonThreadLocal<EventLoop?>()
124
125 internal val eventLoop: EventLoop
<lambda>null126 get() = ref.get() ?: createEventLoop().also { ref.set(it) }
127
currentOrNullnull128 internal fun currentOrNull(): EventLoop? =
129 ref.get()
130
131 internal fun resetEventLoop() {
132 ref.set(null)
133 }
134
setEventLoopnull135 internal fun setEventLoop(eventLoop: EventLoop) {
136 ref.set(eventLoop)
137 }
138 }
139
140 @SharedImmutable
141 private val DISPOSED_TASK = Symbol("REMOVED_TASK")
142
143 // results for scheduleImpl
144 private const val SCHEDULE_OK = 0
145 private const val SCHEDULE_COMPLETED = 1
146 private const val SCHEDULE_DISPOSED = 2
147
148 private const val MS_TO_NS = 1_000_000L
149 private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS
150
151 /**
152 * First-line overflow protection -- limit maximal delay.
153 * Delays longer than this one (~146 years) are considered to be delayed "forever".
154 */
155 private const val MAX_DELAY_NS = Long.MAX_VALUE / 2
156
delayToNanosnull157 internal fun delayToNanos(timeMillis: Long): Long = when {
158 timeMillis <= 0 -> 0L
159 timeMillis >= MAX_MS -> Long.MAX_VALUE
160 else -> timeMillis * MS_TO_NS
161 }
162
delayNanosToMillisnull163 internal fun delayNanosToMillis(timeNanos: Long): Long =
164 timeNanos / MS_TO_NS
165
166 @SharedImmutable
167 private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY")
168
169 private typealias Queue<T> = LockFreeTaskQueueCore<T>
170
171 internal expect abstract class EventLoopImplPlatform() : EventLoop {
172 // Called to unpark this event loop's thread
173 protected fun unpark()
174
175 // Called to reschedule to DefaultExecutor when this event loop is complete
176 protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask)
177 }
178
179 internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
180 // null | CLOSED_EMPTY | task | Queue<Runnable>
181 private val _queue = atomic<Any?>(null)
182
183 // Allocated only only once
184 private val _delayed = atomic<DelayedTaskQueue?>(null)
185
186 private val _isCompleted = atomic(false)
187 private var isCompleted
188 get() = _isCompleted.value
189 set(value) { _isCompleted.value = value }
190
191 override val isEmpty: Boolean get() {
192 if (!isUnconfinedQueueEmpty) return false
193 val delayed = _delayed.value
194 if (delayed != null && !delayed.isEmpty) return false
195 return when (val queue = _queue.value) {
196 null -> true
197 is Queue<*> -> queue.isEmpty
198 else -> queue === CLOSED_EMPTY
199 }
200 }
201
202 protected override val nextTime: Long
203 get() {
204 if (super.nextTime == 0L) return 0L
205 val queue = _queue.value
206 when {
207 queue === null -> {} // empty queue -- proceed
208 queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
209 queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
210 else -> return 0 // non-empty queue
211 }
212 val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE
213 return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)
214 }
215
shutdownnull216 override fun shutdown() {
217 // Clean up thread-local reference here -- this event loop is shutting down
218 ThreadLocalEventLoop.resetEventLoop()
219 // We should signal that this event loop should not accept any more tasks
220 // and process queued events (that could have been added after last processNextEvent)
221 isCompleted = true
222 closeQueue()
223 // complete processing of all queued tasks
224 while (processNextEvent() <= 0) { /* spin */ }
225 // reschedule the rest of delayed tasks
226 rescheduleAllDelayed()
227 }
228
scheduleResumeAfterDelaynull229 public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
230 val timeNanos = delayToNanos(timeMillis)
231 if (timeNanos < MAX_DELAY_NS) {
232 val now = nanoTime()
233 DelayedResumeTask(now + timeNanos, continuation).also { task ->
234 continuation.disposeOnCancellation(task)
235 schedule(now, task)
236 }
237 }
238 }
239
scheduleInvokeOnTimeoutnull240 protected fun scheduleInvokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
241 val timeNanos = delayToNanos(timeMillis)
242 return if (timeNanos < MAX_DELAY_NS) {
243 val now = nanoTime()
244 DelayedRunnableTask(now + timeNanos, block).also { task ->
245 schedule(now, task)
246 }
247 } else {
248 NonDisposableHandle
249 }
250 }
251
processNextEventnull252 override fun processNextEvent(): Long {
253 // unconfined events take priority
254 if (processUnconfinedEvent()) return 0
255 // queue all delayed tasks that are due to be executed
256 val delayed = _delayed.value
257 if (delayed != null && !delayed.isEmpty) {
258 val now = nanoTime()
259 while (true) {
260 // make sure that moving from delayed to queue removes from delayed only after it is added to queue
261 // to make sure that 'isEmpty' and `nextTime` that check both of them
262 // do not transiently report that both delayed and queue are empty during move
263 delayed.removeFirstIf {
264 if (it.timeToExecute(now)) {
265 enqueueImpl(it)
266 } else
267 false
268 } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
269 }
270 }
271 // then process one event from queue
272 val task = dequeue()
273 if (task != null) {
274 task.run()
275 return 0
276 }
277 return nextTime
278 }
279
dispatchnull280 public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
281
282 public fun enqueue(task: Runnable) {
283 if (enqueueImpl(task)) {
284 // todo: we should unpark only when this delayed task became first in the queue
285 unpark()
286 } else {
287 DefaultExecutor.enqueue(task)
288 }
289 }
290
291 @Suppress("UNCHECKED_CAST")
enqueueImplnull292 private fun enqueueImpl(task: Runnable): Boolean {
293 _queue.loop { queue ->
294 if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
295 when (queue) {
296 null -> if (_queue.compareAndSet(null, task)) return true
297 is Queue<*> -> {
298 when ((queue as Queue<Runnable>).addLast(task)) {
299 Queue.ADD_SUCCESS -> return true
300 Queue.ADD_CLOSED -> return false
301 Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
302 }
303 }
304 else -> when {
305 queue === CLOSED_EMPTY -> return false
306 else -> {
307 // update to full-blown queue to add one more
308 val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
309 newQueue.addLast(queue as Runnable)
310 newQueue.addLast(task)
311 if (_queue.compareAndSet(queue, newQueue)) return true
312 }
313 }
314 }
315 }
316 }
317
318 @Suppress("UNCHECKED_CAST")
dequeuenull319 private fun dequeue(): Runnable? {
320 _queue.loop { queue ->
321 when (queue) {
322 null -> return null
323 is Queue<*> -> {
324 val result = (queue as Queue<Runnable>).removeFirstOrNull()
325 if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
326 _queue.compareAndSet(queue, queue.next())
327 }
328 else -> when {
329 queue === CLOSED_EMPTY -> return null
330 else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable
331 }
332 }
333 }
334 }
335
closeQueuenull336 private fun closeQueue() {
337 assert { isCompleted }
338 _queue.loop { queue ->
339 when (queue) {
340 null -> if (_queue.compareAndSet(null, CLOSED_EMPTY)) return
341 is Queue<*> -> {
342 queue.close()
343 return
344 }
345 else -> when {
346 queue === CLOSED_EMPTY -> return
347 else -> {
348 // update to full-blown queue to close
349 val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
350 newQueue.addLast(queue as Runnable)
351 if (_queue.compareAndSet(queue, newQueue)) return
352 }
353 }
354 }
355 }
356
357 }
358
schedulenull359 public fun schedule(now: Long, delayedTask: DelayedTask) {
360 when (scheduleImpl(now, delayedTask)) {
361 SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
362 SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
363 SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
364 else -> error("unexpected result")
365 }
366 }
367
shouldUnparknull368 private fun shouldUnpark(task: DelayedTask): Boolean = _delayed.value?.peek() === task
369
370 private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
371 if (isCompleted) return SCHEDULE_COMPLETED
372 val delayedQueue = _delayed.value ?: run {
373 _delayed.compareAndSet(null, DelayedTaskQueue(now))
374 _delayed.value!!
375 }
376 return delayedTask.scheduleTask(now, delayedQueue, this)
377 }
378
379 // It performs "hard" shutdown for test cleanup purposes
resetAllnull380 protected fun resetAll() {
381 _queue.value = null
382 _delayed.value = null
383 }
384
385 // This is a "soft" (normal) shutdown
rescheduleAllDelayednull386 private fun rescheduleAllDelayed() {
387 val now = nanoTime()
388 while (true) {
389 /*
390 * `removeFirstOrNull` below is the only operation on DelayedTask & ThreadSafeHeap that is not
391 * synchronized on DelayedTask itself. All other operation are synchronized both on
392 * DelayedTask & ThreadSafeHeap instances (in this order). It is still safe, because `dispose`
393 * first removes DelayedTask from the heap (under synchronization) then
394 * assign "_heap = DISPOSED_TASK", so there cannot be ever a race to _heap reference update.
395 */
396 val delayedTask = _delayed.value?.removeFirstOrNull() ?: break
397 reschedule(now, delayedTask)
398 }
399 }
400
401 internal abstract class DelayedTask(
402 /**
403 * This field can be only modified in [scheduleTask] before putting this DelayedTask
404 * into heap to avoid overflow and corruption of heap data structure.
405 */
406 @JvmField var nanoTime: Long
407 ) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
408 private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK
409
410 override var heap: ThreadSafeHeap<*>?
411 get() = _heap as? ThreadSafeHeap<*>
412 set(value) {
413 require(_heap !== DISPOSED_TASK) // this can never happen, it is always checked before adding/removing
414 _heap = value
415 }
416
417 override var index: Int = -1
418
compareTonull419 override fun compareTo(other: DelayedTask): Int {
420 val dTime = nanoTime - other.nanoTime
421 return when {
422 dTime > 0 -> 1
423 dTime < 0 -> -1
424 else -> 0
425 }
426 }
427
timeToExecutenull428 fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L
429
430 @Synchronized
431 fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int {
432 if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed
433 delayed.addLastIf(this) { firstTask ->
434 if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask
435 /**
436 * We are about to add new task and we have to make sure that [DelayedTaskQueue]
437 * invariant is maintained. The code in this lambda is additionally executed under
438 * the lock of [DelayedTaskQueue] and working with [DelayedTaskQueue.timeNow] here is thread-safe.
439 */
440 if (firstTask == null) {
441 /**
442 * When adding the first delayed task we simply update queue's [DelayedTaskQueue.timeNow] to
443 * the current now time even if that means "going backwards in time". This makes the structure
444 * self-correcting in spite of wild jumps in `nanoTime()` measurements once all delayed tasks
445 * are removed from the delayed queue for execution.
446 */
447 delayed.timeNow = now
448 } else {
449 /**
450 * Carefully update [DelayedTaskQueue.timeNow] so that it does not sweep past first's tasks time
451 * and only goes forward in time. We cannot let it go backwards in time or invariant can be
452 * violated for tasks that were already scheduled.
453 */
454 val firstTime = firstTask.nanoTime
455 // compute min(now, firstTime) using a wrap-safe check
456 val minTime = if (firstTime - now >= 0) now else firstTime
457 // update timeNow only when going forward in time
458 if (minTime - delayed.timeNow > 0) delayed.timeNow = minTime
459 }
460 /**
461 * Here [DelayedTaskQueue.timeNow] was already modified and we have to double-check that newly added
462 * task does not violate [DelayedTaskQueue] invariant because of that. Note also that this scheduleTask
463 * function can be called to reschedule from one queue to another and this might be another reason
464 * where new task's time might now violate invariant.
465 * We correct invariant violation (if any) by simply changing this task's time to now.
466 */
467 if (nanoTime - delayed.timeNow < 0) nanoTime = delayed.timeNow
468 true
469 }
470 return SCHEDULE_OK
471 }
472
473 @Synchronized
disposenull474 final override fun dispose() {
475 val heap = _heap
476 if (heap === DISPOSED_TASK) return // already disposed
477 @Suppress("UNCHECKED_CAST")
478 (heap as? DelayedTaskQueue)?.remove(this) // remove if it is in heap (first)
479 _heap = DISPOSED_TASK // never add again to any heap
480 }
481
toStringnull482 override fun toString(): String = "Delayed[nanos=$nanoTime]"
483 }
484
485 private inner class DelayedResumeTask(
486 nanoTime: Long,
487 private val cont: CancellableContinuation<Unit>
488 ) : DelayedTask(nanoTime) {
489 override fun run() { with(cont) { resumeUndispatched(Unit) } }
490 override fun toString(): String = super.toString() + cont.toString()
491 }
492
493 private class DelayedRunnableTask(
494 nanoTime: Long,
495 private val block: Runnable
496 ) : DelayedTask(nanoTime) {
runnull497 override fun run() { block.run() }
toStringnull498 override fun toString(): String = super.toString() + block.toString()
499 }
500
501 /**
502 * Delayed task queue maintains stable time-comparision invariant despite potential wraparounds in
503 * long nano time measurements by maintaining last observed [timeNow]. It protects the integrity of the
504 * heap data structure in spite of potential non-monotonicity of `nanoTime()` source.
505 * The invariant is that for every scheduled [DelayedTask]:
506 *
507 * ```
508 * delayedTask.nanoTime - timeNow >= 0
509 * ```
510 *
511 * So the comparison of scheduled tasks via [DelayedTask.compareTo] is always stable as
512 * scheduled [DelayedTask.nanoTime] can be at most [Long.MAX_VALUE] apart. This invariant is maintained when
513 * new tasks are added by [DelayedTask.scheduleTask] function and it cannot be violated when tasks are removed
514 * (so there is nothing special to do there).
515 */
516 internal class DelayedTaskQueue(
517 @JvmField var timeNow: Long
518 ) : ThreadSafeHeap<DelayedTask>()
519 }
520
521 internal expect fun createEventLoop(): EventLoop
522
523 internal expect fun nanoTime(): Long
524
525 internal expect object DefaultExecutor {
526 public fun enqueue(task: Runnable)
527 }
528
529