1 /* 2 * Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.scheduling 6 7 import kotlinx.coroutines.* 8 import org.junit.Test 9 import java.util.concurrent.* 10 import java.util.concurrent.atomic.AtomicInteger 11 12 class CoroutineSchedulerOversubscriptionTest : TestBase() { 13 14 private val inDefault = AtomicInteger(0) 15 runAndChecknull16 private fun CountDownLatch.runAndCheck() { 17 if (inDefault.incrementAndGet() > CORE_POOL_SIZE) { 18 error("Oversubscription detected") 19 } 20 21 await() 22 inDefault.decrementAndGet() 23 } 24 25 @Test testOverSubscriptionDeterministicnull26 fun testOverSubscriptionDeterministic() = runTest { 27 val barrier = CountDownLatch(1) 28 val threadsOccupiedBarrier = CyclicBarrier(CORE_POOL_SIZE) 29 // All threads but one 30 repeat(CORE_POOL_SIZE - 1) { 31 launch(Dispatchers.Default) { 32 threadsOccupiedBarrier.await() 33 barrier.runAndCheck() 34 } 35 } 36 threadsOccupiedBarrier.await() 37 withContext(Dispatchers.Default) { 38 // Put a task in a local queue, it will be stolen 39 launch(Dispatchers.Default) { 40 barrier.runAndCheck() 41 } 42 // Put one more task to trick the local queue check 43 launch(Dispatchers.Default) { 44 barrier.runAndCheck() 45 } 46 47 withContext(Dispatchers.IO) { 48 try { 49 // Release the thread 50 delay(100) 51 } finally { 52 barrier.countDown() 53 } 54 } 55 } 56 } 57 58 @Test <lambda>null59 fun testOverSubscriptionStress() = repeat(1000 * stressTestMultiplierSqrt) { 60 inDefault.set(0) 61 runTest { 62 val barrier = CountDownLatch(1) 63 val threadsOccupiedBarrier = CyclicBarrier(CORE_POOL_SIZE) 64 // All threads but one 65 repeat(CORE_POOL_SIZE - 1) { 66 launch(Dispatchers.Default) { 67 threadsOccupiedBarrier.await() 68 barrier.runAndCheck() 69 } 70 } 71 threadsOccupiedBarrier.await() 72 withContext(Dispatchers.Default) { 73 // Put a task in a local queue 74 launch(Dispatchers.Default) { 75 barrier.runAndCheck() 76 } 77 // Put one more task to trick the local queue check 78 launch(Dispatchers.Default) { 79 barrier.runAndCheck() 80 } 81 82 withContext(Dispatchers.IO) { 83 yield() 84 barrier.countDown() 85 } 86 } 87 } 88 } 89 } 90