1 /* 2 * Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import kotlinx.atomicfu.* 8 import kotlinx.coroutines.channels.* 9 import kotlinx.coroutines.internal.* 10 import kotlin.native.concurrent.* 11 import kotlin.test.* 12 import kotlin.time.Duration.Companion.seconds 13 14 private class BlockingBarrier(val n: Int) { 15 val counter = atomic(0) 16 val wakeUp = Channel<Unit>(n - 1) awaitnull17 fun await() { 18 val count = counter.addAndGet(1) 19 if (count == n) { 20 repeat(n - 1) { 21 runBlocking { 22 wakeUp.send(Unit) 23 } 24 } 25 } else if (count < n) { 26 runBlocking { 27 wakeUp.receive() 28 } 29 } 30 } 31 } 32 33 class MultithreadedDispatchersTest { 34 /** 35 * Test that [newFixedThreadPoolContext] does not allocate more dispatchers than it needs to. 36 * Incidentally also tests that it will allocate enough workers for its needs. Otherwise, the test will hang. 37 */ 38 @Test testNotAllocatingExtraDispatchersnull39 fun testNotAllocatingExtraDispatchers() { 40 val barrier = BlockingBarrier(2) 41 val lock = SynchronizedObject() 42 suspend fun spin(set: MutableSet<Worker>) { 43 repeat(100) { 44 synchronized(lock) { set.add(Worker.current) } 45 delay(1) 46 } 47 } 48 val dispatcher = newFixedThreadPoolContext(64, "test") 49 try { 50 runBlocking { 51 val encounteredWorkers = mutableSetOf<Worker>() 52 val coroutine1 = launch(dispatcher) { 53 barrier.await() 54 spin(encounteredWorkers) 55 } 56 val coroutine2 = launch(dispatcher) { 57 barrier.await() 58 spin(encounteredWorkers) 59 } 60 listOf(coroutine1, coroutine2).joinAll() 61 assertEquals(2, encounteredWorkers.size) 62 } 63 } finally { 64 dispatcher.close() 65 } 66 } 67 68 /** 69 * Test that [newSingleThreadContext] will not wait for the cancelled scheduled coroutines before closing. 70 */ 71 @Test <lambda>null72 fun timeoutsNotPreventingClosing(): Unit = runBlocking { 73 val dispatcher = WorkerDispatcher("test") 74 withContext(dispatcher) { 75 withTimeout(5.seconds) { 76 } 77 } 78 withTimeout(1.seconds) { 79 dispatcher.close() // should not wait for the timeout 80 yield() 81 } 82 } 83 } 84