1 /* 2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import kotlin.coroutines.* 8 import kotlin.jvm.* 9 10 internal class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineDispatcher(), Delay { 11 private val originalDispatcher = enclosingScope.coroutineContext[ContinuationInterceptor] as CoroutineDispatcher 12 private val heap = ArrayList<TimedTask>() // TODO use MPP heap/ordered set implementation (commonize ThreadSafeHeap) 13 14 var currentTime = 0L 15 private set 16 17 init { 18 /* 19 * Launch "event-loop-owning" task on start of the virtual time event loop. 20 * It ensures the progress of the enclosing event-loop and polls the timed queue 21 * when the enclosing event loop is empty, emulating virtual time. 22 */ <lambda>null23 enclosingScope.launch { 24 while (true) { 25 val delayNanos = ThreadLocalEventLoop.currentOrNull()?.processNextEvent() 26 ?: error("Event loop is missing, virtual time source works only as part of event loop") 27 if (delayNanos <= 0) continue 28 if (delayNanos > 0 && delayNanos != Long.MAX_VALUE) error("Unexpected external delay: $delayNanos") 29 val nextTask = heap.minByOrNull { it.deadline } ?: return@launch 30 heap.remove(nextTask) 31 currentTime = nextTask.deadline 32 nextTask.run() 33 } 34 } 35 } 36 37 private inner class TimedTask( 38 private val runnable: Runnable, 39 @JvmField val deadline: Long <lambda>null40 ) : DisposableHandle, Runnable by runnable { 41 42 override fun dispose() { 43 heap.remove(this) 44 } 45 } 46 dispatchnull47 override fun dispatch(context: CoroutineContext, block: Runnable) { 48 originalDispatcher.dispatch(context, block) 49 } 50 isDispatchNeedednull51 override fun isDispatchNeeded(context: CoroutineContext): Boolean = originalDispatcher.isDispatchNeeded(context) 52 53 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { 54 val task = TimedTask(block, deadline(timeMillis)) 55 heap += task 56 return task 57 } 58 scheduleResumeAfterDelaynull59 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { 60 val task = TimedTask(Runnable { with(continuation) { resumeUndispatched(Unit) } }, deadline(timeMillis)) 61 heap += task 62 continuation.invokeOnCancellation { task.dispose() } 63 } 64 deadlinenull65 private fun deadline(timeMillis: Long) = 66 if (timeMillis == Long.MAX_VALUE) Long.MAX_VALUE else currentTime + timeMillis 67 } 68 69 /** 70 * Runs a test ([TestBase.runTest]) with a virtual time source. 71 * This runner has the following constraints: 72 * 1) It works only in the event-loop environment and it is relying on it. 73 * None of the coroutines should be launched in any dispatcher different from a current 74 * 2) Regular tasks always dominate delayed ones. It means that 75 * `launch { while(true) yield() }` will block the progress of the delayed tasks 76 * 3) [TestBase.finish] should always be invoked. 77 * Given all the constraints into account, it is easy to mess up with a test and actually 78 * return from [withVirtualTime] before the test is executed completely. 79 * To decrease the probability of such error, additional `finish` constraint is added. 80 */ 81 public fun TestBase.withVirtualTime(block: suspend CoroutineScope.() -> Unit) = runTest { 82 withContext(Dispatchers.Unconfined) { 83 // Create a platform-independent event loop 84 val dispatcher = VirtualTimeDispatcher(this) 85 withContext(dispatcher) { block() } 86 ensureFinished() 87 } 88 } 89