1 /* <lambda>null2 * Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import kotlinx.coroutines.internal.* 8 import kotlinx.coroutines.selects.* 9 import kotlinx.coroutines.sync.* 10 import org.junit.* 11 import org.junit.Test 12 import java.util.concurrent.* 13 import java.util.concurrent.atomic.AtomicBoolean 14 import java.util.concurrent.atomic.AtomicInteger 15 import kotlin.test.* 16 17 class MutexCancellationStressTest : TestBase() { 18 @Test 19 fun testStressCancellationDoesNotBreakMutex() = runTest { 20 val mutex = Mutex() 21 val mutexJobNumber = 3 22 val mutexOwners = Array(mutexJobNumber) { "$it" } 23 val dispatcher = Executors.newFixedThreadPool(mutexJobNumber + 2).asCoroutineDispatcher() 24 var counter = 0 25 val counterLocal = Array(mutexJobNumber) { AtomicInteger(0) } 26 val completed = AtomicBoolean(false) 27 val mutexJobLauncher: (jobNumber: Int) -> Job = { jobId -> 28 val coroutineName = "MutexJob-$jobId" 29 // ATOMIC to always have a chance to proceed 30 launch(dispatcher + CoroutineName(coroutineName), CoroutineStart.ATOMIC) { 31 while (!completed.get()) { 32 // Stress out holdsLock 33 mutex.holdsLock(mutexOwners[(jobId + 1) % mutexJobNumber]) 34 // Stress out lock-like primitives 35 if (mutex.tryLock(mutexOwners[jobId])) { 36 counterLocal[jobId].incrementAndGet() 37 counter++ 38 mutex.unlock(mutexOwners[jobId]) 39 } 40 mutex.withLock(mutexOwners[jobId]) { 41 counterLocal[jobId].incrementAndGet() 42 counter++ 43 } 44 select<Unit> { 45 mutex.onLock(mutexOwners[jobId]) { 46 counterLocal[jobId].incrementAndGet() 47 counter++ 48 mutex.unlock(mutexOwners[jobId]) 49 } 50 } 51 } 52 } 53 } 54 val mutexJobs = (0 until mutexJobNumber).map { mutexJobLauncher(it) }.toMutableList() 55 val checkProgressJob = launch(dispatcher + CoroutineName("checkProgressJob")) { 56 var lastCounterLocalSnapshot = (0 until mutexJobNumber).map { 0 } 57 while (!completed.get()) { 58 delay(500) 59 // If we've caught the completion after delay, then there is a chance no progress were made whatsoever, bail out 60 if (completed.get()) return@launch 61 val c = counterLocal.map { it.value } 62 for (i in 0 until mutexJobNumber) { 63 assert(c[i] > lastCounterLocalSnapshot[i]) { "No progress in MutexJob-$i, last observed state: ${c[i]}" } 64 } 65 lastCounterLocalSnapshot = c 66 } 67 } 68 val cancellationJob = launch(dispatcher + CoroutineName("cancellationJob")) { 69 var cancellingJobId = 0 70 while (!completed.get()) { 71 val jobToCancel = mutexJobs.removeFirst() 72 jobToCancel.cancelAndJoin() 73 mutexJobs += mutexJobLauncher(cancellingJobId) 74 cancellingJobId = (cancellingJobId + 1) % mutexJobNumber 75 } 76 } 77 delay(2000L * stressTestMultiplier) 78 completed.set(true) 79 cancellationJob.join() 80 mutexJobs.forEach { it.join() } 81 checkProgressJob.join() 82 assertEquals(counter, counterLocal.sumOf { it.value }) 83 dispatcher.close() 84 } 85 } 86