<lambda>null1package kotlinx.coroutines 2 3 import kotlinx.coroutines.testing.* 4 import kotlinx.coroutines.exceptions.* 5 import kotlinx.coroutines.internal.* 6 import kotlin.test.* 7 8 class ConcurrentExceptionsStressTest : TestBase() { 9 private val nWorkers = 4 10 private val nRepeat = 1000 * stressTestMultiplier 11 12 private var workers: Array<CloseableCoroutineDispatcher> = emptyArray() 13 14 @AfterTest 15 fun tearDown() { 16 workers.forEach { 17 it.close() 18 } 19 } 20 21 @Test 22 fun testStress() = runTest { 23 workers = Array(nWorkers) { index -> 24 newSingleThreadContext("JobExceptionsStressTest-$index") 25 } 26 27 repeat(nRepeat) { 28 testOnce() 29 } 30 } 31 32 @Suppress("SuspendFunctionOnCoroutineScope") // workaround native inline fun stacktraces 33 private suspend fun CoroutineScope.testOnce() { 34 val deferred = async(NonCancellable) { 35 repeat(nWorkers) { index -> 36 // Always launch a coroutine even if parent job was already cancelled (atomic start) 37 launch(workers[index], start = CoroutineStart.ATOMIC) { 38 randomWait() 39 throw StressException(index) 40 } 41 } 42 } 43 deferred.join() 44 assertTrue(deferred.isCancelled) 45 val completionException = deferred.getCompletionExceptionOrNull() 46 val cause = completionException as? StressException 47 ?: unexpectedException("completion", completionException) 48 val suppressed = cause.suppressedExceptions 49 val indices = listOf(cause.index) + suppressed.mapIndexed { index, e -> 50 (e as? StressException)?.index ?: unexpectedException("suppressed $index", e) 51 } 52 repeat(nWorkers) { index -> 53 assertTrue(index in indices, "Exception $index is missing: $indices") 54 } 55 assertEquals(nWorkers, indices.size, "Duplicated exceptions in list: $indices") 56 } 57 58 private fun unexpectedException(msg: String, e: Throwable?): Nothing { 59 throw IllegalStateException("Unexpected $msg exception", e) 60 } 61 62 private class StressException(val index: Int) : Throwable() 63 } 64 65