• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import org.junit.*
8 import org.junit.Test
9 import java.util.concurrent.*
10 import kotlin.test.*
11 
12 class JobActivationStressTest : TestBase() {
13     private val N_ITERATIONS = 10_000 * stressTestMultiplier
14     private val pool = newFixedThreadPoolContext(3, "JobActivationStressTest")
15 
16     @After
tearDownnull17     fun tearDown() {
18         pool.close()
19     }
20 
21     /**
22      * Perform concurrent start & cancel of a job with prior installed completion handlers
23      */
24     @Test
25     @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
<lambda>null26     fun testActivation() = runTest {
27         val barrier = CyclicBarrier(3)
28         val scope = CoroutineScope(pool)
29         repeat(N_ITERATIONS) {
30             var wasStarted = false
31             val d = scope.async(NonCancellable, start = CoroutineStart.LAZY) {
32                 wasStarted = true
33                 throw TestException()
34             }
35             // need to add on completion handler
36             val causeHolder = object {
37                 var cause: Throwable? = null
38             }
39             // we use synchronization on causeHolder to work around the fact that completion listeners
40             // are invoked after the job is in the final state, so when "d.join()" completes there is
41             // no guarantee that this listener was already invoked
42             d.invokeOnCompletion {
43                 synchronized(causeHolder) {
44                     causeHolder.cause = it ?: Error("Empty cause")
45                     (causeHolder as Object).notifyAll()
46                 }
47             }
48             // concurrent cancel
49             val canceller = scope.launch {
50                 barrier.await()
51                 d.cancel()
52             }
53             // concurrent cancel
54             val starter = scope.launch {
55                 barrier.await()
56                 d.start()
57             }
58             barrier.await()
59             joinAll(d, canceller, starter)
60             if (wasStarted) {
61                 val exception = d.getCompletionExceptionOrNull()
62                 assertTrue(exception is TestException, "exception=$exception")
63                 val cause = synchronized(causeHolder) {
64                     while (causeHolder.cause == null) (causeHolder as Object).wait()
65                     causeHolder.cause
66                 }
67                 assertTrue(cause is TestException, "cause=$cause")
68             }
69         }
70     }
71 }