• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlin.coroutines.*
8 import kotlin.jvm.*
9 
10 internal class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineDispatcher(), Delay {
11     private val originalDispatcher = enclosingScope.coroutineContext[ContinuationInterceptor] as CoroutineDispatcher
12     private val heap = ArrayList<TimedTask>() // TODO use MPP heap/ordered set implementation (commonize ThreadSafeHeap)
13 
14     var currentTime = 0L
15         private set
16 
17     init {
18         /*
19          * Launch "event-loop-owning" task on start of the virtual time event loop.
20          * It ensures the progress of the enclosing event-loop and polls the timed queue
21          * when the enclosing event loop is empty, emulating virtual time.
22          */
<lambda>null23         enclosingScope.launch {
24             while (true) {
25                 val delayNanos = ThreadLocalEventLoop.currentOrNull()?.processNextEvent()
26                     ?: error("Event loop is missing, virtual time source works only as part of event loop")
27                 if (delayNanos <= 0) continue
28                 if (delayNanos > 0 && delayNanos != Long.MAX_VALUE) error("Unexpected external delay: $delayNanos")
29                 val nextTask = heap.minBy { it.deadline } ?: return@launch
30                 heap.remove(nextTask)
31                 currentTime = nextTask.deadline
32                 nextTask.run()
33             }
34         }
35     }
36 
37     private inner class TimedTask(
38         private val runnable: Runnable,
39         @JvmField val deadline: Long
<lambda>null40     ) : DisposableHandle, Runnable by runnable {
41 
42         override fun dispose() {
43             heap.remove(this)
44         }
45     }
46 
dispatchnull47     override fun dispatch(context: CoroutineContext, block: Runnable) {
48         originalDispatcher.dispatch(context, block)
49     }
50 
51     @ExperimentalCoroutinesApi
isDispatchNeedednull52     override fun isDispatchNeeded(context: CoroutineContext): Boolean = originalDispatcher.isDispatchNeeded(context)
53 
54     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
55         val task = TimedTask(block, deadline(timeMillis))
56         heap += task
57         return task
58     }
59 
scheduleResumeAfterDelaynull60     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
61         val task = TimedTask(Runnable { with(continuation) { resumeUndispatched(Unit) } }, deadline(timeMillis))
62         heap += task
63         continuation.invokeOnCancellation { task.dispose() }
64     }
65 
deadlinenull66     private fun deadline(timeMillis: Long) =
67         if (timeMillis == Long.MAX_VALUE) Long.MAX_VALUE else currentTime + timeMillis
68 }
69 
70 /**
71  * Runs a test ([TestBase.runTest]) with a virtual time source.
72  * This runner has the following constraints:
73  * 1) It works only in the event-loop environment and it is relying on it.
74  *    None of the coroutines should be launched in any dispatcher different from a current
75  * 2) Regular tasks always dominate delayed ones. It means that
76  *    `launch { while(true) yield() }` will block the progress of the delayed tasks
77  * 3) [TestBase.finish] should always be invoked.
78  *    Given all the constraints into account, it is easy to mess up with a test and actually
79  *    return from [withVirtualTime] before the test is executed completely.
80  *    To decrease the probability of such error, additional `finish` constraint is added.
81  */
82 public fun TestBase.withVirtualTime(block: suspend CoroutineScope.() -> Unit) = runTest {
83     withContext(Dispatchers.Unconfined) {
84         // Create a platform-independent event loop
85         val dispatcher = VirtualTimeDispatcher(this)
86         withContext(dispatcher) { block() }
87         ensureFinished()
88     }
89 }
90