1 /* 2 * Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.scheduling 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.internal.AVAILABLE_PROCESSORS 9 import org.junit.Test 10 import java.util.* 11 import java.util.concurrent.ConcurrentHashMap 12 import java.util.concurrent.CountDownLatch 13 import java.util.concurrent.CyclicBarrier 14 import java.util.concurrent.atomic.AtomicInteger 15 import kotlin.random.* 16 import kotlin.random.Random 17 import kotlin.test.* 18 import kotlin.time.* 19 20 class CoroutineSchedulerInternalApiStressTest : TestBase() { 21 22 @Test(timeout = 120_000L) <lambda>null23 fun testHelpDefaultIoIsIsolated() = repeat(100 * stressTestMultiplierSqrt) { 24 val ioTaskMarker = ThreadLocal.withInitial { false } 25 runTest { 26 val jobToComplete = Job() 27 val expectedIterations = 100 28 val completionLatch = CountDownLatch(1) 29 val tasksToCompleteJob = AtomicInteger(expectedIterations) 30 val observedIoThreads = Collections.newSetFromMap(ConcurrentHashMap<Thread, Boolean>()) 31 val observedDefaultThreads = Collections.newSetFromMap(ConcurrentHashMap<Thread, Boolean>()) 32 33 val barrier = CyclicBarrier(AVAILABLE_PROCESSORS) 34 val spawners = ArrayList<Job>() 35 repeat(AVAILABLE_PROCESSORS - 1) { 36 // Launch CORES - 1 spawners 37 spawners += launch(Dispatchers.Default) { 38 barrier.await() 39 repeat(expectedIterations) { 40 launch { 41 val tasksLeft = tasksToCompleteJob.decrementAndGet() 42 if (tasksLeft < 0) return@launch // Leftovers are being executed all over the place 43 observedDefaultThreads.add(Thread.currentThread()) 44 if (tasksLeft == 0) { 45 // Verify threads first 46 try { 47 assertFalse(observedIoThreads.containsAll(observedDefaultThreads)) 48 } finally { 49 jobToComplete.complete() 50 } 51 } 52 } 53 54 // Sometimes launch an IO task to mess with a scheduler 55 if (Random.nextInt(0..9) == 0) { 56 launch(Dispatchers.IO) { 57 ioTaskMarker.set(true) 58 observedIoThreads.add(Thread.currentThread()) 59 assertTrue(Thread.currentThread().isIoDispatcherThread()) 60 } 61 } 62 } 63 completionLatch.await() 64 } 65 } 66 67 withContext(Dispatchers.Default) { 68 barrier.await() 69 var timesHelped = 0 70 while (!jobToComplete.isCompleted) { 71 val result = runSingleTaskFromCurrentSystemDispatcher() 72 assertFalse(ioTaskMarker.get()) 73 if (result == 0L) { 74 ++timesHelped 75 continue 76 } else if (result >= 0L) { 77 Thread.sleep(result.toDuration(DurationUnit.NANOSECONDS).toDelayMillis()) 78 } else { 79 Thread.sleep(10) 80 } 81 } 82 completionLatch.countDown() 83 assertEquals(100, timesHelped) 84 assertTrue(Thread.currentThread() in observedDefaultThreads, observedDefaultThreads.toString()) 85 } 86 } 87 } 88 } 89 90