1 package kotlinx.coroutines.scheduling 2 3 import kotlinx.coroutines.testing.* 4 import kotlinx.coroutines.* 5 import org.junit.Test 6 import java.util.concurrent.* 7 import java.util.concurrent.atomic.AtomicInteger 8 9 class CoroutineSchedulerOversubscriptionTest : TestBase() { 10 11 private val inDefault = AtomicInteger(0) 12 runAndChecknull13 private fun CountDownLatch.runAndCheck() { 14 if (inDefault.incrementAndGet() > CORE_POOL_SIZE) { 15 error("Oversubscription detected") 16 } 17 18 await() 19 inDefault.decrementAndGet() 20 } 21 22 @Test testOverSubscriptionDeterministicnull23 fun testOverSubscriptionDeterministic() = runTest { 24 val barrier = CountDownLatch(1) 25 val threadsOccupiedBarrier = CyclicBarrier(CORE_POOL_SIZE) 26 // All threads but one 27 repeat(CORE_POOL_SIZE - 1) { 28 launch(Dispatchers.Default) { 29 threadsOccupiedBarrier.await() 30 barrier.runAndCheck() 31 } 32 } 33 threadsOccupiedBarrier.await() 34 withContext(Dispatchers.Default) { 35 // Put a task in a local queue, it will be stolen 36 launch(Dispatchers.Default) { 37 barrier.runAndCheck() 38 } 39 // Put one more task to trick the local queue check 40 launch(Dispatchers.Default) { 41 barrier.runAndCheck() 42 } 43 44 withContext(Dispatchers.IO) { 45 try { 46 // Release the thread 47 delay(100) 48 } finally { 49 barrier.countDown() 50 } 51 } 52 } 53 } 54 55 @Test <lambda>null56 fun testOverSubscriptionStress() = repeat(1000 * stressTestMultiplierSqrt) { 57 inDefault.set(0) 58 runTest { 59 val barrier = CountDownLatch(1) 60 val threadsOccupiedBarrier = CyclicBarrier(CORE_POOL_SIZE) 61 // All threads but one 62 repeat(CORE_POOL_SIZE - 1) { 63 launch(Dispatchers.Default) { 64 threadsOccupiedBarrier.await() 65 barrier.runAndCheck() 66 } 67 } 68 threadsOccupiedBarrier.await() 69 withContext(Dispatchers.Default) { 70 // Put a task in a local queue 71 launch(Dispatchers.Default) { 72 barrier.runAndCheck() 73 } 74 // Put one more task to trick the local queue check 75 launch(Dispatchers.Default) { 76 barrier.runAndCheck() 77 } 78 79 withContext(Dispatchers.IO) { 80 yield() 81 barrier.countDown() 82 } 83 } 84 } 85 } 86 } 87