• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package kotlinx.coroutines.scheduling
2 
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.*
5 import org.junit.Test
6 import java.util.concurrent.*
7 import java.util.concurrent.atomic.AtomicInteger
8 
9 class CoroutineSchedulerOversubscriptionTest : TestBase() {
10 
11     private val inDefault = AtomicInteger(0)
12 
runAndChecknull13     private fun CountDownLatch.runAndCheck() {
14         if (inDefault.incrementAndGet() > CORE_POOL_SIZE) {
15             error("Oversubscription detected")
16         }
17 
18         await()
19         inDefault.decrementAndGet()
20     }
21 
22     @Test
testOverSubscriptionDeterministicnull23     fun testOverSubscriptionDeterministic() = runTest {
24         val barrier = CountDownLatch(1)
25         val threadsOccupiedBarrier = CyclicBarrier(CORE_POOL_SIZE)
26         // All threads but one
27         repeat(CORE_POOL_SIZE - 1) {
28             launch(Dispatchers.Default) {
29                 threadsOccupiedBarrier.await()
30                 barrier.runAndCheck()
31             }
32         }
33         threadsOccupiedBarrier.await()
34         withContext(Dispatchers.Default) {
35             // Put a task in a local queue, it will be stolen
36             launch(Dispatchers.Default) {
37                 barrier.runAndCheck()
38             }
39             // Put one more task to trick the local queue check
40             launch(Dispatchers.Default) {
41                 barrier.runAndCheck()
42             }
43 
44             withContext(Dispatchers.IO) {
45                 try {
46                     // Release the thread
47                     delay(100)
48                 } finally {
49                     barrier.countDown()
50                 }
51             }
52         }
53     }
54 
55     @Test
<lambda>null56     fun testOverSubscriptionStress() = repeat(1000 * stressTestMultiplierSqrt) {
57         inDefault.set(0)
58         runTest {
59             val barrier = CountDownLatch(1)
60             val threadsOccupiedBarrier = CyclicBarrier(CORE_POOL_SIZE)
61             // All threads but one
62             repeat(CORE_POOL_SIZE - 1) {
63                 launch(Dispatchers.Default) {
64                     threadsOccupiedBarrier.await()
65                     barrier.runAndCheck()
66                 }
67             }
68             threadsOccupiedBarrier.await()
69             withContext(Dispatchers.Default) {
70                 // Put a task in a local queue
71                 launch(Dispatchers.Default) {
72                     barrier.runAndCheck()
73                 }
74                 // Put one more task to trick the local queue check
75                 launch(Dispatchers.Default) {
76                     barrier.runAndCheck()
77                 }
78 
79                 withContext(Dispatchers.IO) {
80                     yield()
81                     barrier.countDown()
82                 }
83             }
84         }
85     }
86 }
87