• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.internal.*
10 import kotlin.native.concurrent.*
11 import kotlin.test.*
12 import kotlin.time.Duration.Companion.seconds
13 
14 private class BlockingBarrier(val n: Int) {
15     val counter = atomic(0)
16     val wakeUp = Channel<Unit>(n - 1)
awaitnull17     fun await() {
18         val count = counter.addAndGet(1)
19         if (count == n) {
20             repeat(n - 1) {
21                 runBlocking {
22                     wakeUp.send(Unit)
23                 }
24             }
25         } else if (count < n) {
26             runBlocking {
27                 wakeUp.receive()
28             }
29         }
30     }
31 }
32 
33 class MultithreadedDispatchersTest {
34     /**
35      * Test that [newFixedThreadPoolContext] does not allocate more dispatchers than it needs to.
36      * Incidentally also tests that it will allocate enough workers for its needs. Otherwise, the test will hang.
37      */
38     @Test
testNotAllocatingExtraDispatchersnull39     fun testNotAllocatingExtraDispatchers() {
40         val barrier = BlockingBarrier(2)
41         val lock = SynchronizedObject()
42         suspend fun spin(set: MutableSet<Worker>) {
43             repeat(100) {
44                 synchronized(lock) { set.add(Worker.current) }
45                 delay(1)
46             }
47         }
48         val dispatcher = newFixedThreadPoolContext(64, "test")
49         try {
50             runBlocking {
51                 val encounteredWorkers = mutableSetOf<Worker>()
52                 val coroutine1 = launch(dispatcher) {
53                     barrier.await()
54                     spin(encounteredWorkers)
55                 }
56                 val coroutine2 = launch(dispatcher) {
57                     barrier.await()
58                     spin(encounteredWorkers)
59                 }
60                 listOf(coroutine1, coroutine2).joinAll()
61                 assertEquals(2, encounteredWorkers.size)
62             }
63         } finally {
64             dispatcher.close()
65         }
66     }
67 
68     /**
69      * Test that [newSingleThreadContext] will not wait for the cancelled scheduled coroutines before closing.
70      */
71     @Test
<lambda>null72     fun timeoutsNotPreventingClosing(): Unit = runBlocking {
73         val dispatcher = WorkerDispatcher("test")
74         withContext(dispatcher) {
75             withTimeout(5.seconds) {
76             }
77         }
78         withTimeout(1.seconds) {
79             dispatcher.close() // should not wait for the timeout
80             yield()
81         }
82     }
83 }
84