• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import org.junit.*
8 import org.junit.Test
9 import org.junit.runner.*
10 import org.junit.runners.*
11 import java.util.concurrent.*
12 import java.util.concurrent.atomic.*
13 import kotlin.test.*
14 
15 @RunWith(Parameterized::class)
16 class LimitedParallelismStressTest(private val targetParallelism: Int) : TestBase() {
17 
18     companion object {
19         @Parameterized.Parameters(name = "{0}")
20         @JvmStatic
<lambda>null21         fun params(): Collection<Array<Any>> = listOf(1, 2, 3, 4).map { arrayOf(it) }
22     }
23 
24     @get:Rule
25     val executor = ExecutorRule(targetParallelism * 2)
26     private val iterations = 100_000
27 
28     private val parallelism = AtomicInteger(0)
29 
checkParallelismnull30     private fun checkParallelism() {
31         val value = parallelism.incrementAndGet()
32         Thread.yield()
33         assertTrue { value <= targetParallelism }
34         parallelism.decrementAndGet()
35     }
36 
37     @Test
<lambda>null38     fun testLimitedExecutor() = runTest {
39         val view = executor.limitedParallelism(targetParallelism)
40         doStress {
41             repeat(iterations) {
42                 launch(view) {
43                     checkParallelism()
44                 }
45             }
46         }
47     }
48 
49     @Test
<lambda>null50     fun testLimitedDispatchersIo() = runTest {
51         val view = Dispatchers.IO.limitedParallelism(targetParallelism)
52         doStress {
53             repeat(iterations) {
54                 launch(view) {
55                     checkParallelism()
56                 }
57             }
58         }
59     }
60 
61     @Test
<lambda>null62     fun testLimitedDispatchersIoDispatchYield() = runTest {
63         val view = Dispatchers.IO.limitedParallelism(targetParallelism)
64         doStress {
65             launch(view) {
66                 yield()
67                 checkParallelism()
68             }
69         }
70     }
71 
72     @Test
<lambda>null73     fun testLimitedExecutorReachesTargetParallelism() = runTest {
74         val view = executor.limitedParallelism(targetParallelism)
75         doStress {
76             repeat(iterations) {
77                 val barrier = CyclicBarrier(targetParallelism + 1)
78                 repeat(targetParallelism) {
79                     launch(view) {
80                         barrier.await()
81                     }
82                 }
83                 // Successfully awaited parallelism + 1
84                 barrier.await()
85                 coroutineContext.job.children.toList().joinAll()
86             }
87         }
88     }
89 
doStressnull90     private suspend inline fun doStress(crossinline block: suspend CoroutineScope.() -> Unit) {
91         repeat(stressTestMultiplier) {
92             coroutineScope {
93                 block()
94             }
95         }
96     }
97 }
98