• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.coroutines.internal.*
8 import kotlinx.coroutines.selects.*
9 import kotlinx.coroutines.sync.*
10 import org.junit.*
11 import org.junit.Test
12 import java.util.concurrent.*
13 import java.util.concurrent.atomic.AtomicBoolean
14 import java.util.concurrent.atomic.AtomicInteger
15 import kotlin.test.*
16 
17 class MutexCancellationStressTest : TestBase() {
18     @Test
19     fun testStressCancellationDoesNotBreakMutex() = runTest {
20         val mutex = Mutex()
21         val mutexJobNumber = 3
22         val mutexOwners = Array(mutexJobNumber) { "$it" }
23         val dispatcher = Executors.newFixedThreadPool(mutexJobNumber + 2).asCoroutineDispatcher()
24         var counter = 0
25         val counterLocal = Array(mutexJobNumber) { AtomicInteger(0) }
26         val completed = AtomicBoolean(false)
27         val mutexJobLauncher: (jobNumber: Int) -> Job = { jobId ->
28             val coroutineName = "MutexJob-$jobId"
29             // ATOMIC to always have a chance to proceed
30             launch(dispatcher + CoroutineName(coroutineName), CoroutineStart.ATOMIC) {
31                 while (!completed.get()) {
32                     // Stress out holdsLock
33                     mutex.holdsLock(mutexOwners[(jobId + 1) % mutexJobNumber])
34                     // Stress out lock-like primitives
35                     if (mutex.tryLock(mutexOwners[jobId])) {
36                         counterLocal[jobId].incrementAndGet()
37                         counter++
38                         mutex.unlock(mutexOwners[jobId])
39                     }
40                     mutex.withLock(mutexOwners[jobId]) {
41                         counterLocal[jobId].incrementAndGet()
42                         counter++
43                     }
44                     select<Unit> {
45                         mutex.onLock(mutexOwners[jobId]) {
46                             counterLocal[jobId].incrementAndGet()
47                             counter++
48                             mutex.unlock(mutexOwners[jobId])
49                         }
50                     }
51                 }
52             }
53         }
54         val mutexJobs = (0 until mutexJobNumber).map { mutexJobLauncher(it) }.toMutableList()
55         val checkProgressJob = launch(dispatcher + CoroutineName("checkProgressJob")) {
56             var lastCounterLocalSnapshot = (0 until mutexJobNumber).map { 0 }
57             while (!completed.get()) {
58                 delay(500)
59                 // If we've caught the completion after delay, then there is a chance no progress were made whatsoever, bail out
60                 if (completed.get()) return@launch
61                 val c = counterLocal.map { it.value }
62                 for (i in 0 until mutexJobNumber) {
63                     assert(c[i] > lastCounterLocalSnapshot[i]) { "No progress in MutexJob-$i, last observed state: ${c[i]}" }
64                 }
65                 lastCounterLocalSnapshot = c
66             }
67         }
68         val cancellationJob = launch(dispatcher + CoroutineName("cancellationJob")) {
69             var cancellingJobId = 0
70             while (!completed.get()) {
71                 val jobToCancel = mutexJobs.removeFirst()
72                 jobToCancel.cancelAndJoin()
73                 mutexJobs += mutexJobLauncher(cancellingJobId)
74                 cancellingJobId = (cancellingJobId + 1) % mutexJobNumber
75             }
76         }
77         delay(2000L * stressTestMultiplier)
78         completed.set(true)
79         cancellationJob.join()
80         mutexJobs.forEach { it.join() }
81         checkProgressJob.join()
82         assertEquals(counter, counterLocal.sumOf { it.value })
83         dispatcher.close()
84     }
85 }
86