1 /* 2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import kotlin.coroutines.* 8 import kotlin.jvm.* 9 10 internal class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineDispatcher(), Delay { 11 private val originalDispatcher = enclosingScope.coroutineContext[ContinuationInterceptor] as CoroutineDispatcher 12 private val heap = ArrayList<TimedTask>() // TODO use MPP heap/ordered set implementation (commonize ThreadSafeHeap) 13 14 var currentTime = 0L 15 private set 16 17 init { 18 /* 19 * Launch "event-loop-owning" task on start of the virtual time event loop. 20 * It ensures the progress of the enclosing event-loop and polls the timed queue 21 * when the enclosing event loop is empty, emulating virtual time. 22 */ <lambda>null23 enclosingScope.launch { 24 while (true) { 25 val delayNanos = ThreadLocalEventLoop.currentOrNull()?.processNextEvent() 26 ?: error("Event loop is missing, virtual time source works only as part of event loop") 27 if (delayNanos <= 0) continue 28 if (delayNanos > 0 && delayNanos != Long.MAX_VALUE) error("Unexpected external delay: $delayNanos") 29 val nextTask = heap.minBy { it.deadline } ?: return@launch 30 heap.remove(nextTask) 31 currentTime = nextTask.deadline 32 nextTask.run() 33 } 34 } 35 } 36 37 private inner class TimedTask( 38 private val runnable: Runnable, 39 @JvmField val deadline: Long <lambda>null40 ) : DisposableHandle, Runnable by runnable { 41 42 override fun dispose() { 43 heap.remove(this) 44 } 45 } 46 dispatchnull47 override fun dispatch(context: CoroutineContext, block: Runnable) { 48 originalDispatcher.dispatch(context, block) 49 } 50 51 @ExperimentalCoroutinesApi isDispatchNeedednull52 override fun isDispatchNeeded(context: CoroutineContext): Boolean = originalDispatcher.isDispatchNeeded(context) 53 54 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { 55 val task = TimedTask(block, deadline(timeMillis)) 56 heap += task 57 return task 58 } 59 scheduleResumeAfterDelaynull60 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { 61 val task = TimedTask(Runnable { with(continuation) { resumeUndispatched(Unit) } }, deadline(timeMillis)) 62 heap += task 63 continuation.invokeOnCancellation { task.dispose() } 64 } 65 deadlinenull66 private fun deadline(timeMillis: Long) = 67 if (timeMillis == Long.MAX_VALUE) Long.MAX_VALUE else currentTime + timeMillis 68 } 69 70 /** 71 * Runs a test ([TestBase.runTest]) with a virtual time source. 72 * This runner has the following constraints: 73 * 1) It works only in the event-loop environment and it is relying on it. 74 * None of the coroutines should be launched in any dispatcher different from a current 75 * 2) Regular tasks always dominate delayed ones. It means that 76 * `launch { while(true) yield() }` will block the progress of the delayed tasks 77 * 3) [TestBase.finish] should always be invoked. 78 * Given all the constraints into account, it is easy to mess up with a test and actually 79 * return from [withVirtualTime] before the test is executed completely. 80 * To decrease the probability of such error, additional `finish` constraint is added. 81 */ 82 public fun TestBase.withVirtualTime(block: suspend CoroutineScope.() -> Unit) = runTest { 83 withContext(Dispatchers.Unconfined) { 84 // Create a platform-independent event loop 85 val dispatcher = VirtualTimeDispatcher(this) 86 withContext(dispatcher) { block() } 87 ensureFinished() 88 } 89 } 90