1 /* 2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import org.junit.* 8 import org.junit.Test 9 import java.util.concurrent.* 10 11 class AwaitStressTest : TestBase() { 12 13 private val iterations = 50_000 * stressTestMultiplier 14 @get:Rule 15 public val pool = ExecutorRule(4) 16 17 @Test <lambda>null18 fun testMultipleExceptions() = runTest { 19 val ctx = pool + NonCancellable 20 repeat(iterations) { 21 val barrier = CyclicBarrier(4) 22 val d1 = async(ctx) { 23 barrier.await() 24 throw TestException() 25 } 26 val d2 = async(ctx) { 27 barrier.await() 28 throw TestException() 29 } 30 val d3 = async(ctx) { 31 barrier.await() 32 1L 33 } 34 try { 35 barrier.await() 36 awaitAll(d1, d2, d3) 37 expectUnreached() 38 } catch (e: TestException) { 39 // Expected behaviour 40 } 41 42 barrier.reset() 43 } 44 } 45 46 @Test <lambda>null47 fun testAwaitAll() = runTest { 48 val barrier = CyclicBarrier(3) 49 repeat(iterations) { 50 val d1 = async(pool) { 51 barrier.await() 52 1L 53 } 54 val d2 = async(pool) { 55 barrier.await() 56 2L 57 } 58 barrier.await() 59 awaitAll(d1, d2) 60 require(d1.isCompleted && d2.isCompleted) 61 barrier.reset() 62 } 63 } 64 65 @Test <lambda>null66 fun testConcurrentCancellation() = runTest { 67 var cancelledOnce = false 68 repeat(iterations) { 69 val barrier = CyclicBarrier(3) 70 71 val d1 = async(pool) { 72 barrier.await() 73 delay(10_000) 74 yield() 75 } 76 77 val d2 = async(pool) { 78 barrier.await() 79 d1.cancel() 80 } 81 82 barrier.await() 83 try { 84 awaitAll(d1, d2) 85 } catch (e: CancellationException) { 86 cancelledOnce = true 87 } 88 } 89 90 require(cancelledOnce) { "Cancellation exception wasn't properly caught" } 91 } 92 93 @Test <lambda>null94 fun testMutatingCollection() = runTest { 95 val barrier = CyclicBarrier(4) 96 97 repeat(iterations) { 98 // thread-safe collection that we are going to modify 99 val deferreds = CopyOnWriteArrayList<Deferred<Long>>() 100 101 deferreds += async(pool) { 102 barrier.await() 103 1L 104 } 105 106 deferreds += async(pool) { 107 barrier.await() 108 2L 109 } 110 111 deferreds += async(pool) { 112 barrier.await() 113 deferreds.removeAt(2) 114 3L 115 } 116 117 val allJobs = ArrayList(deferreds) 118 barrier.await() 119 val results = deferreds.awaitAll() // shouldn't hang 120 check(results == listOf(1L, 2L, 3L) || results == listOf(1L, 2L)) 121 allJobs.awaitAll() 122 barrier.reset() 123 } 124 } 125 } 126