1 /* 2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import org.junit.* 8 import org.junit.Test 9 import org.junit.runner.* 10 import org.junit.runners.* 11 import java.util.concurrent.* 12 import java.util.concurrent.atomic.* 13 import kotlin.test.* 14 15 @RunWith(Parameterized::class) 16 class LimitedParallelismStressTest(private val targetParallelism: Int) : TestBase() { 17 18 companion object { 19 @Parameterized.Parameters(name = "{0}") 20 @JvmStatic <lambda>null21 fun params(): Collection<Array<Any>> = listOf(1, 2, 3, 4).map { arrayOf(it) } 22 } 23 24 @get:Rule 25 val executor = ExecutorRule(targetParallelism * 2) 26 private val iterations = 100_000 27 28 private val parallelism = AtomicInteger(0) 29 checkParallelismnull30 private fun checkParallelism() { 31 val value = parallelism.incrementAndGet() 32 Thread.yield() 33 assertTrue { value <= targetParallelism } 34 parallelism.decrementAndGet() 35 } 36 37 @Test <lambda>null38 fun testLimitedExecutor() = runTest { 39 val view = executor.limitedParallelism(targetParallelism) 40 doStress { 41 repeat(iterations) { 42 launch(view) { 43 checkParallelism() 44 } 45 } 46 } 47 } 48 49 @Test <lambda>null50 fun testLimitedDispatchersIo() = runTest { 51 val view = Dispatchers.IO.limitedParallelism(targetParallelism) 52 doStress { 53 repeat(iterations) { 54 launch(view) { 55 checkParallelism() 56 } 57 } 58 } 59 } 60 61 @Test <lambda>null62 fun testLimitedDispatchersIoDispatchYield() = runTest { 63 val view = Dispatchers.IO.limitedParallelism(targetParallelism) 64 doStress { 65 launch(view) { 66 yield() 67 checkParallelism() 68 } 69 } 70 } 71 72 @Test <lambda>null73 fun testLimitedExecutorReachesTargetParallelism() = runTest { 74 val view = executor.limitedParallelism(targetParallelism) 75 doStress { 76 repeat(iterations) { 77 val barrier = CyclicBarrier(targetParallelism + 1) 78 repeat(targetParallelism) { 79 launch(view) { 80 barrier.await() 81 } 82 } 83 // Successfully awaited parallelism + 1 84 barrier.await() 85 coroutineContext.job.children.toList().joinAll() 86 } 87 } 88 } 89 doStressnull90 private suspend inline fun doStress(crossinline block: suspend CoroutineScope.() -> Unit) { 91 repeat(stressTestMultiplier) { 92 coroutineScope { 93 block() 94 } 95 } 96 } 97 } 98