• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.scheduling
6 
7 import kotlinx.coroutines.*
8 import org.junit.Test
9 import java.util.concurrent.*
10 import java.util.concurrent.atomic.AtomicInteger
11 
12 class CoroutineSchedulerOversubscriptionTest : TestBase() {
13 
14     private val inDefault = AtomicInteger(0)
15 
runAndChecknull16     private fun CountDownLatch.runAndCheck() {
17         if (inDefault.incrementAndGet() > CORE_POOL_SIZE) {
18             error("Oversubscription detected")
19         }
20 
21         await()
22         inDefault.decrementAndGet()
23     }
24 
25     @Test
testOverSubscriptionDeterministicnull26     fun testOverSubscriptionDeterministic() = runTest {
27         val barrier = CountDownLatch(1)
28         val threadsOccupiedBarrier = CyclicBarrier(CORE_POOL_SIZE)
29         // All threads but one
30         repeat(CORE_POOL_SIZE - 1) {
31             launch(Dispatchers.Default) {
32                 threadsOccupiedBarrier.await()
33                 barrier.runAndCheck()
34             }
35         }
36         threadsOccupiedBarrier.await()
37         withContext(Dispatchers.Default) {
38             // Put a task in a local queue, it will be stolen
39             launch(Dispatchers.Default) {
40                 barrier.runAndCheck()
41             }
42             // Put one more task to trick the local queue check
43             launch(Dispatchers.Default) {
44                 barrier.runAndCheck()
45             }
46 
47             withContext(Dispatchers.IO) {
48                 try {
49                     // Release the thread
50                     delay(100)
51                 } finally {
52                     barrier.countDown()
53                 }
54             }
55         }
56     }
57 
58     @Test
<lambda>null59     fun testOverSubscriptionStress() = repeat(1000 * stressTestMultiplierSqrt) {
60         inDefault.set(0)
61         runTest {
62             val barrier = CountDownLatch(1)
63             val threadsOccupiedBarrier = CyclicBarrier(CORE_POOL_SIZE)
64             // All threads but one
65             repeat(CORE_POOL_SIZE - 1) {
66                 launch(Dispatchers.Default) {
67                     threadsOccupiedBarrier.await()
68                     barrier.runAndCheck()
69                 }
70             }
71             threadsOccupiedBarrier.await()
72             withContext(Dispatchers.Default) {
73                 // Put a task in a local queue
74                 launch(Dispatchers.Default) {
75                     barrier.runAndCheck()
76                 }
77                 // Put one more task to trick the local queue check
78                 launch(Dispatchers.Default) {
79                     barrier.runAndCheck()
80                 }
81 
82                 withContext(Dispatchers.IO) {
83                     yield()
84                     barrier.countDown()
85                 }
86             }
87         }
88     }
89 }
90