• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines
2 
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.exceptions.*
5 import kotlinx.coroutines.internal.*
6 import kotlin.test.*
7 
8 class ConcurrentExceptionsStressTest : TestBase() {
9     private val nWorkers = 4
10     private val nRepeat = 1000 * stressTestMultiplier
11 
12     private var workers: Array<CloseableCoroutineDispatcher> = emptyArray()
13 
14     @AfterTest
15     fun tearDown() {
16         workers.forEach {
17             it.close()
18         }
19     }
20 
21     @Test
22     fun testStress() = runTest {
23         workers = Array(nWorkers) { index ->
24             newSingleThreadContext("JobExceptionsStressTest-$index")
25         }
26 
27         repeat(nRepeat) {
28             testOnce()
29         }
30     }
31 
32     @Suppress("SuspendFunctionOnCoroutineScope") // workaround native inline fun stacktraces
33     private suspend fun CoroutineScope.testOnce() {
34         val deferred = async(NonCancellable) {
35             repeat(nWorkers) { index ->
36                 // Always launch a coroutine even if parent job was already cancelled (atomic start)
37                 launch(workers[index], start = CoroutineStart.ATOMIC) {
38                     randomWait()
39                     throw StressException(index)
40                 }
41             }
42         }
43         deferred.join()
44         assertTrue(deferred.isCancelled)
45         val completionException = deferred.getCompletionExceptionOrNull()
46         val cause = completionException as? StressException
47             ?: unexpectedException("completion", completionException)
48         val suppressed = cause.suppressedExceptions
49         val indices = listOf(cause.index) + suppressed.mapIndexed { index, e ->
50             (e as? StressException)?.index ?: unexpectedException("suppressed $index", e)
51         }
52         repeat(nWorkers) { index ->
53             assertTrue(index in indices, "Exception $index is missing: $indices")
54         }
55         assertEquals(nWorkers, indices.size, "Duplicated exceptions in list: $indices")
56     }
57 
58     private fun unexpectedException(msg: String, e: Throwable?): Nothing {
59         throw IllegalStateException("Unexpected $msg exception", e)
60     }
61 
62     private class StressException(val index: Int) : Throwable()
63 }
64 
65