1 /* 2 * Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import kotlinx.atomicfu.* 8 import kotlinx.coroutines.* 9 import kotlinx.coroutines.exceptions.* 10 import kotlin.coroutines.* 11 import kotlin.test.* 12 13 class LimitedParallelismConcurrentTest : TestBase() { 14 15 private val targetParallelism = 4 16 private val iterations = 100_000 17 private val parallelism = atomic(0) 18 checkParallelismnull19 private fun checkParallelism() { 20 val value = parallelism.incrementAndGet() 21 randomWait() 22 assertTrue { value <= targetParallelism } 23 parallelism.decrementAndGet() 24 } 25 26 @Test <lambda>null27 fun testLimitedExecutor() = runTest { 28 val executor = newFixedThreadPoolContext(targetParallelism, "test") 29 val view = executor.limitedParallelism(targetParallelism) 30 doStress { 31 repeat(iterations) { 32 launch(view) { 33 checkParallelism() 34 } 35 } 36 } 37 executor.close() 38 } 39 doStressnull40 private suspend inline fun doStress(crossinline block: suspend CoroutineScope.() -> Unit) { 41 repeat(stressTestMultiplier) { 42 coroutineScope { 43 block() 44 } 45 } 46 } 47 48 @Test <lambda>null49 fun testTaskFairness() = runTest { 50 val executor = newSingleThreadContext("test") 51 val view = executor.limitedParallelism(1) 52 val view2 = executor.limitedParallelism(1) 53 val j1 = launch(view) { 54 while (true) { 55 yield() 56 } 57 } 58 val j2 = launch(view2) { j1.cancel() } 59 joinAll(j1, j2) 60 executor.close() 61 } 62 63 /** 64 * Tests that, when no tasks are present, the limited dispatcher does not dispatch any tasks. 65 * This is important for the case when a dispatcher is closeable and the [CoroutineDispatcher.limitedParallelism] 66 * machinery could trigger a dispatch after the dispatcher is closed. 67 */ 68 @Test <lambda>null69 fun testNotDoingDispatchesWhenNoTasksArePresent() = runTest { 70 class NaggingDispatcher: CoroutineDispatcher() { 71 val closed = atomic(false) 72 override fun dispatch(context: CoroutineContext, block: Runnable) { 73 if (closed.value) 74 fail("Dispatcher was closed, but still dispatched a task") 75 Dispatchers.Default.dispatch(context, block) 76 } 77 fun close() { 78 closed.value = true 79 } 80 } 81 repeat(stressTestMultiplier * 500_000) { 82 val dispatcher = NaggingDispatcher() 83 val view = dispatcher.limitedParallelism(1) 84 val deferred = CompletableDeferred<Unit>() 85 val job = launch(view) { 86 deferred.await() 87 } 88 launch(Dispatchers.Default) { 89 deferred.complete(Unit) 90 } 91 job.join() 92 dispatcher.close() 93 } 94 } 95 } 96