• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.coroutines.exceptions.*
8 import kotlinx.coroutines.internal.*
9 import kotlin.test.*
10 
11 class ConcurrentExceptionsStressTest : TestBase() {
12     private val nWorkers = 4
13     private val nRepeat = 1000 * stressTestMultiplier
14 
15     private var workers: Array<CloseableCoroutineDispatcher> = emptyArray()
16 
17     @AfterTest
18     fun tearDown() {
19         workers.forEach {
20             it.close()
21         }
22     }
23 
24     @Test
25     fun testStress() = runMtTest {
26         workers = Array(nWorkers) { index ->
27             newSingleThreadContext("JobExceptionsStressTest-$index")
28         }
29 
30         repeat(nRepeat) {
31             testOnce()
32         }
33     }
34 
35     @Suppress("SuspendFunctionOnCoroutineScope") // workaround native inline fun stacktraces
36     private suspend fun CoroutineScope.testOnce() {
37         val deferred = async(NonCancellable) {
38             repeat(nWorkers) { index ->
39                 // Always launch a coroutine even if parent job was already cancelled (atomic start)
40                 launch(workers[index], start = CoroutineStart.ATOMIC) {
41                     randomWait()
42                     throw StressException(index)
43                 }
44             }
45         }
46         deferred.join()
47         assertTrue(deferred.isCancelled)
48         val completionException = deferred.getCompletionExceptionOrNull()
49         val cause = completionException as? StressException
50             ?: unexpectedException("completion", completionException)
51         val suppressed = cause.suppressed
52         val indices = listOf(cause.index) + suppressed.mapIndexed { index, e ->
53             (e as? StressException)?.index ?: unexpectedException("suppressed $index", e)
54         }
55         repeat(nWorkers) { index ->
56             assertTrue(index in indices, "Exception $index is missing: $indices")
57         }
58         assertEquals(nWorkers, indices.size, "Duplicated exceptions in list: $indices")
59     }
60 
61     private fun unexpectedException(msg: String, e: Throwable?): Nothing {
62         throw IllegalStateException("Unexpected $msg exception", e)
63     }
64 
65     private class StressException(val index: Int) : SuppressSupportingThrowable()
66 }
67 
68