1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 @file:Suppress("UNUSED_VARIABLE") 6 7 package kotlinx.coroutines.scheduling 8 9 import kotlinx.atomicfu.* 10 import kotlinx.coroutines.* 11 import kotlinx.coroutines.internal.* 12 import org.junit.* 13 import kotlin.coroutines.* 14 15 abstract class SchedulerTestBase : TestBase() { 16 companion object { 17 val CORES_COUNT = AVAILABLE_PROCESSORS 18 19 /** 20 * Asserts that [expectedThreadsCount] pool worker threads were created. 21 * Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking 22 */ 23 fun checkPoolThreadsCreated(expectedThreadsCount: Int = CORES_COUNT) { 24 val threadsCount = maxSequenceNumber()!! 25 require(threadsCount == expectedThreadsCount) 26 { "Expected $expectedThreadsCount pool threads, but has $threadsCount" } 27 } 28 29 /** 30 * Asserts that any number of pool worker threads in [range] were created. 31 * Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking 32 */ 33 fun checkPoolThreadsCreated(range: IntRange) { 34 val maxSequenceNumber = maxSequenceNumber()!! 35 require(maxSequenceNumber in range) { "Expected pool threads to be in interval $range, but has $maxSequenceNumber" } 36 } 37 38 /** 39 * Asserts that any number of pool worker threads in [range] exists at the time of method invocation 40 */ 41 fun checkPoolThreadsExist(range: IntRange) { 42 val threads = Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker }.count() 43 require(threads in range) { "Expected threads in $range interval, but has $threads" } 44 } 45 46 private fun maxSequenceNumber(): Int? { 47 return Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker } 48 .map { sequenceNumber(it.name) }.max() 49 } 50 51 private fun sequenceNumber(threadName: String): Int { 52 val suffix = threadName.substring(threadName.lastIndexOf("-") + 1) 53 val separatorIndex = suffix.indexOf(' ') 54 if (separatorIndex == -1) { 55 return suffix.toInt() 56 } 57 58 return suffix.substring(0, separatorIndex).toInt() 59 } 60 61 suspend fun Iterable<Job>.joinAll() = forEach { it.join() } 62 } 63 64 private val exception = atomic<Throwable?>(null) 65 private val handler = CoroutineExceptionHandler { _, e -> exception.value = e } 66 67 protected var corePoolSize = 1 68 protected var maxPoolSize = 1024 69 protected var idleWorkerKeepAliveNs = IDLE_WORKER_KEEP_ALIVE_NS 70 71 private var _dispatcher: ExperimentalCoroutineDispatcher? = null 72 protected val dispatcher: CoroutineContext 73 get() { 74 if (_dispatcher == null) { 75 _dispatcher = ExperimentalCoroutineDispatcher( 76 corePoolSize, 77 maxPoolSize, 78 idleWorkerKeepAliveNs 79 ) 80 } 81 82 return _dispatcher!! + handler 83 } 84 85 protected var blockingDispatcher = lazy { 86 blockingDispatcher(1000) 87 } 88 89 protected fun blockingDispatcher(parallelism: Int): CoroutineContext { 90 val intitialize = dispatcher 91 return _dispatcher!!.blocking(parallelism) + handler 92 } 93 94 protected fun view(parallelism: Int): CoroutineContext { 95 val intitialize = dispatcher 96 return _dispatcher!!.limited(parallelism) + handler 97 } 98 99 @After 100 fun after() { 101 runBlocking { 102 withTimeout(5_000) { 103 _dispatcher?.close() 104 } 105 } 106 exception.value?.let { throw it } 107 } 108 }