• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package kotlinx.coroutines.test.internal
2 
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.test.*
6 import kotlin.coroutines.*
7 
8 /**
9  * The testable main dispatcher used by kotlinx-coroutines-test.
10  * It is a [MainCoroutineDispatcher] that delegates all actions to a settable delegate.
11  */
12 internal class TestMainDispatcher(createInnerMain: () -> CoroutineDispatcher):
13     MainCoroutineDispatcher(),
14     Delay
15 {
<lambda>null16     internal constructor(delegate: CoroutineDispatcher): this({ delegate })
17 
18     private val mainDispatcher by lazy(createInnerMain)
19     private var delegate = NonConcurrentlyModifiable<CoroutineDispatcher?>(null, "Dispatchers.Main")
20 
21     private val dispatcher
22         get() = delegate.value ?: mainDispatcher
23 
24     private val delay
25         get() = dispatcher as? Delay ?: defaultDelay
26 
27     override val immediate: MainCoroutineDispatcher
28         get() = (dispatcher as? MainCoroutineDispatcher)?.immediate ?: this
29 
dispatchnull30     override fun dispatch(context: CoroutineContext, block: Runnable) = dispatcher.dispatch(context, block)
31 
32     override fun isDispatchNeeded(context: CoroutineContext): Boolean = dispatcher.isDispatchNeeded(context)
33 
34     override fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatcher.dispatchYield(context, block)
35 
36     fun setDispatcher(dispatcher: CoroutineDispatcher) {
37         delegate.value = dispatcher
38     }
39 
resetDispatchernull40     fun resetDispatcher() {
41         delegate.value = null
42     }
43 
scheduleResumeAfterDelaynull44     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) =
45         delay.scheduleResumeAfterDelay(timeMillis, continuation)
46 
47     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
48         delay.invokeOnTimeout(timeMillis, block, context)
49 
50     companion object {
51         internal val currentTestDispatcher
52             get() = (Dispatchers.Main as? TestMainDispatcher)?.delegate?.value as? TestDispatcher
53 
54         internal val currentTestScheduler
55             get() = currentTestDispatcher?.scheduler
56     }
57 
58     /**
59      * A wrapper around a value that attempts to throw when writing happens concurrently with reading.
60      *
61      * The read operations never throw. Instead, the failures detected inside them will be remembered and thrown on the
62      * next modification.
63      */
64     private class NonConcurrentlyModifiable<T>(initialValue: T, private val name: String) {
65         private val reader: AtomicRef<Throwable?> = atomic(null) // last reader to attempt access
66         private val readers = atomic(0) // number of concurrent readers
67         private val writer: AtomicRef<Throwable?> = atomic(null) // writer currently performing value modification
68         private val exceptionWhenReading: AtomicRef<Throwable?> = atomic(null) // exception from reading
69         private val _value = atomic(initialValue) // the backing field for the value
70 
concurrentWWnull71         private fun concurrentWW(location: Throwable) = IllegalStateException("$name is modified concurrently", location)
72         private fun concurrentRW(location: Throwable) = IllegalStateException("$name is used concurrently with setting it", location)
73 
74         var value: T
75             get() {
76                 reader.value = Throwable("reader location")
77                 readers.incrementAndGet()
78                 writer.value?.let { exceptionWhenReading.value = concurrentRW(it) }
79                 val result = _value.value
80                 readers.decrementAndGet()
81                 return result
82             }
83             set(value) {
<lambda>null84                 exceptionWhenReading.getAndSet(null)?.let { throw it }
<lambda>null85                 if (readers.value != 0) reader.value?.let { throw concurrentRW(it) }
86                 val writerLocation = Throwable("other writer location")
<lambda>null87                 writer.getAndSet(writerLocation)?.let { throw concurrentWW(it) }
88                 _value.value = value
89                 writer.compareAndSet(writerLocation, null)
<lambda>null90                 if (readers.value != 0) reader.value?.let { throw concurrentRW(it) }
91             }
92     }
93 }
94 
95 @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") // do not remove the INVISIBLE_REFERENCE suppression: required in K2
96 private val defaultDelay
97     inline get() = DefaultDelay
98 
99 @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") // do not remove the INVISIBLE_REFERENCE suppression: required in K2
getTestMainDispatchernull100 internal expect fun Dispatchers.getTestMainDispatcher(): TestMainDispatcher
101