• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:Suppress("UNUSED_VARIABLE")
6 
7 package kotlinx.coroutines.scheduling
8 
9 import kotlinx.atomicfu.*
10 import kotlinx.coroutines.*
11 import kotlinx.coroutines.internal.*
12 import org.junit.*
13 import kotlin.coroutines.*
14 
15 abstract class SchedulerTestBase : TestBase() {
16     companion object {
17         val CORES_COUNT = AVAILABLE_PROCESSORS
18 
19         /**
20          * Asserts that [expectedThreadsCount] pool worker threads were created.
21          * Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking
22          */
23         fun checkPoolThreadsCreated(expectedThreadsCount: Int = CORES_COUNT) {
24             val threadsCount = maxSequenceNumber()!!
25             require(threadsCount == expectedThreadsCount)
26                 { "Expected $expectedThreadsCount pool threads, but has $threadsCount" }
27         }
28 
29         /**
30          * Asserts that any number of pool worker threads in [range] were created.
31          * Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking
32          */
33         fun checkPoolThreadsCreated(range: IntRange) {
34             val maxSequenceNumber = maxSequenceNumber()!!
35             require(maxSequenceNumber in range) { "Expected pool threads to be in interval $range, but has $maxSequenceNumber" }
36         }
37 
38         /**
39          * Asserts that any number of pool worker threads in [range] exists at the time of method invocation
40          */
41         fun checkPoolThreadsExist(range: IntRange) {
42             val threads = Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker }.count()
43             require(threads in range) { "Expected threads in $range interval, but has $threads" }
44         }
45 
46         private fun maxSequenceNumber(): Int? {
47             return Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker }
48                 .map { sequenceNumber(it.name) }.max()
49         }
50 
51         private fun sequenceNumber(threadName: String): Int {
52             val suffix = threadName.substring(threadName.lastIndexOf("-") + 1)
53             val separatorIndex = suffix.indexOf(' ')
54             if (separatorIndex == -1) {
55                 return suffix.toInt()
56             }
57 
58             return suffix.substring(0, separatorIndex).toInt()
59         }
60 
61         suspend fun Iterable<Job>.joinAll() = forEach { it.join() }
62     }
63 
64     private val exception = atomic<Throwable?>(null)
65     private val handler = CoroutineExceptionHandler { _, e -> exception.value = e }
66 
67     protected var corePoolSize = 1
68     protected var maxPoolSize = 1024
69     protected var idleWorkerKeepAliveNs = IDLE_WORKER_KEEP_ALIVE_NS
70 
71     private var _dispatcher: ExperimentalCoroutineDispatcher? = null
72     protected val dispatcher: CoroutineContext
73         get() {
74             if (_dispatcher == null) {
75                 _dispatcher = ExperimentalCoroutineDispatcher(
76                     corePoolSize,
77                     maxPoolSize,
78                     idleWorkerKeepAliveNs
79                 )
80             }
81 
82             return _dispatcher!! + handler
83         }
84 
85     protected var blockingDispatcher = lazy {
86         blockingDispatcher(1000)
87     }
88 
89     protected fun blockingDispatcher(parallelism: Int): CoroutineContext {
90         val intitialize = dispatcher
91         return _dispatcher!!.blocking(parallelism) + handler
92     }
93 
94     protected fun view(parallelism: Int): CoroutineContext {
95         val intitialize = dispatcher
96         return _dispatcher!!.limited(parallelism) + handler
97     }
98 
99     @After
100     fun after() {
101         runBlocking {
102             withTimeout(5_000) {
103                 _dispatcher?.close()
104             }
105         }
106         exception.value?.let { throw it }
107     }
108 }