• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.exceptions.*
10 import kotlin.coroutines.*
11 import kotlin.test.*
12 
13 class LimitedParallelismConcurrentTest : TestBase() {
14 
15     private val targetParallelism = 4
16     private val iterations = 100_000
17     private val parallelism = atomic(0)
18 
checkParallelismnull19     private fun checkParallelism() {
20         val value = parallelism.incrementAndGet()
21         randomWait()
22         assertTrue { value <= targetParallelism }
23         parallelism.decrementAndGet()
24     }
25 
26     @Test
<lambda>null27     fun testLimitedExecutor() = runTest {
28         val executor = newFixedThreadPoolContext(targetParallelism, "test")
29         val view = executor.limitedParallelism(targetParallelism)
30         doStress {
31             repeat(iterations) {
32                 launch(view) {
33                     checkParallelism()
34                 }
35             }
36         }
37         executor.close()
38     }
39 
doStressnull40     private suspend inline fun doStress(crossinline block: suspend CoroutineScope.() -> Unit) {
41         repeat(stressTestMultiplier) {
42             coroutineScope {
43                 block()
44             }
45         }
46     }
47 
48     @Test
<lambda>null49     fun testTaskFairness() = runTest {
50         val executor = newSingleThreadContext("test")
51         val view = executor.limitedParallelism(1)
52         val view2 = executor.limitedParallelism(1)
53         val j1 = launch(view) {
54             while (true) {
55                 yield()
56             }
57         }
58         val j2 = launch(view2) { j1.cancel() }
59         joinAll(j1, j2)
60         executor.close()
61     }
62 
63     /**
64      * Tests that, when no tasks are present, the limited dispatcher does not dispatch any tasks.
65      * This is important for the case when a dispatcher is closeable and the [CoroutineDispatcher.limitedParallelism]
66      * machinery could trigger a dispatch after the dispatcher is closed.
67      */
68     @Test
<lambda>null69     fun testNotDoingDispatchesWhenNoTasksArePresent() = runTest {
70         class NaggingDispatcher: CoroutineDispatcher() {
71             val closed = atomic(false)
72             override fun dispatch(context: CoroutineContext, block: Runnable) {
73                 if (closed.value)
74                     fail("Dispatcher was closed, but still dispatched a task")
75                 Dispatchers.Default.dispatch(context, block)
76             }
77             fun close() {
78                 closed.value = true
79             }
80         }
81         repeat(stressTestMultiplier * 500_000) {
82             val dispatcher = NaggingDispatcher()
83             val view = dispatcher.limitedParallelism(1)
84             val deferred = CompletableDeferred<Unit>()
85             val job = launch(view) {
86                 deferred.await()
87             }
88             launch(Dispatchers.Default) {
89                 deferred.complete(Unit)
90             }
91             job.join()
92             dispatcher.close()
93         }
94     }
95 }
96