1 /* 2 * Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import kotlinx.atomicfu.* 8 import kotlinx.coroutines.* 9 import kotlinx.coroutines.exceptions.* 10 import kotlin.test.* 11 12 class LimitedParallelismConcurrentTest : TestBase() { 13 14 private val targetParallelism = 4 15 private val iterations = 100_000 16 private val parallelism = atomic(0) 17 checkParallelismnull18 private fun checkParallelism() { 19 val value = parallelism.incrementAndGet() 20 randomWait() 21 assertTrue { value <= targetParallelism } 22 parallelism.decrementAndGet() 23 } 24 25 @Test <lambda>null26 fun testLimitedExecutor() = runMtTest { 27 val executor = newFixedThreadPoolContext(targetParallelism, "test") 28 val view = executor.limitedParallelism(targetParallelism) 29 doStress { 30 repeat(iterations) { 31 launch(view) { 32 checkParallelism() 33 } 34 } 35 } 36 executor.close() 37 } 38 doStressnull39 private suspend inline fun doStress(crossinline block: suspend CoroutineScope.() -> Unit) { 40 repeat(stressTestMultiplier) { 41 coroutineScope { 42 block() 43 } 44 } 45 } 46 47 @Test <lambda>null48 fun testTaskFairness() = runMtTest { 49 val executor = newSingleThreadContext("test") 50 val view = executor.limitedParallelism(1) 51 val view2 = executor.limitedParallelism(1) 52 val j1 = launch(view) { 53 while (true) { 54 yield() 55 } 56 } 57 val j2 = launch(view2) { j1.cancel() } 58 joinAll(j1, j2) 59 executor.close() 60 } 61 } 62