• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.scheduling
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.internal.AVAILABLE_PROCESSORS
9 import org.junit.Test
10 import java.util.*
11 import java.util.concurrent.ConcurrentHashMap
12 import java.util.concurrent.CountDownLatch
13 import java.util.concurrent.CyclicBarrier
14 import java.util.concurrent.atomic.AtomicInteger
15 import kotlin.random.*
16 import kotlin.random.Random
17 import kotlin.test.*
18 import kotlin.time.*
19 
20 class CoroutineSchedulerInternalApiStressTest : TestBase() {
21 
22     @Test(timeout = 120_000L)
<lambda>null23     fun testHelpDefaultIoIsIsolated() = repeat(100 * stressTestMultiplierSqrt) {
24         val ioTaskMarker = ThreadLocal.withInitial { false }
25         runTest {
26             val jobToComplete = Job()
27             val expectedIterations = 100
28             val completionLatch = CountDownLatch(1)
29             val tasksToCompleteJob = AtomicInteger(expectedIterations)
30             val observedIoThreads = Collections.newSetFromMap(ConcurrentHashMap<Thread, Boolean>())
31             val observedDefaultThreads = Collections.newSetFromMap(ConcurrentHashMap<Thread, Boolean>())
32 
33             val barrier = CyclicBarrier(AVAILABLE_PROCESSORS)
34             val spawners = ArrayList<Job>()
35             repeat(AVAILABLE_PROCESSORS - 1) {
36                 // Launch CORES - 1 spawners
37                 spawners += launch(Dispatchers.Default) {
38                     barrier.await()
39                     repeat(expectedIterations) {
40                         launch {
41                             val tasksLeft = tasksToCompleteJob.decrementAndGet()
42                             if (tasksLeft < 0) return@launch // Leftovers are being executed all over the place
43                             observedDefaultThreads.add(Thread.currentThread())
44                             if (tasksLeft == 0) {
45                                 // Verify threads first
46                                 try {
47                                     assertFalse(observedIoThreads.containsAll(observedDefaultThreads))
48                                 } finally {
49                                     jobToComplete.complete()
50                                 }
51                             }
52                         }
53 
54                         // Sometimes launch an IO task to mess with a scheduler
55                         if (Random.nextInt(0..9) == 0) {
56                             launch(Dispatchers.IO) {
57                                 ioTaskMarker.set(true)
58                                 observedIoThreads.add(Thread.currentThread())
59                                 assertTrue(Thread.currentThread().isIoDispatcherThread())
60                             }
61                         }
62                     }
63                     completionLatch.await()
64                 }
65             }
66 
67             withContext(Dispatchers.Default) {
68                 barrier.await()
69                 var timesHelped = 0
70                 while (!jobToComplete.isCompleted) {
71                     val result = runSingleTaskFromCurrentSystemDispatcher()
72                     assertFalse(ioTaskMarker.get())
73                     if (result == 0L) {
74                         ++timesHelped
75                         continue
76                     } else if (result >= 0L) {
77                         Thread.sleep(result.toDuration(DurationUnit.NANOSECONDS).toDelayMillis())
78                     } else {
79                         Thread.sleep(10)
80                     }
81                 }
82                 completionLatch.countDown()
83                 assertEquals(100, timesHelped)
84                 assertTrue(Thread.currentThread() in observedDefaultThreads, observedDefaultThreads.toString())
85             }
86         }
87     }
88 }
89 
90