1 /* <lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import kotlinx.coroutines.flow.* 8 import kotlinx.coroutines.internal.* 9 import kotlinx.coroutines.scheduling.* 10 import org.junit.* 11 import org.junit.Test 12 import java.util.concurrent.* 13 import kotlin.test.* 14 15 class RejectedExecutionTest : TestBase() { 16 private val threadName = "RejectedExecutionTest" 17 private val executor = RejectingExecutor() 18 19 @After 20 fun tearDown() { 21 executor.shutdown() 22 executor.awaitTermination(10, TimeUnit.SECONDS) 23 } 24 25 @Test 26 fun testRejectOnLaunch() = runTest { 27 expect(1) 28 val job = launch(executor.asCoroutineDispatcher()) { 29 expectUnreached() 30 } 31 assertEquals(1, executor.submittedTasks) 32 assertTrue(job.isCancelled) 33 finish(2) 34 } 35 36 @Test 37 fun testRejectOnLaunchAtomic() = runTest { 38 expect(1) 39 val job = launch(executor.asCoroutineDispatcher(), start = CoroutineStart.ATOMIC) { 40 expect(2) 41 assertEquals(true, coroutineContext[Job]?.isCancelled) 42 assertIoThread() // was rejected on start, but start was atomic 43 } 44 assertEquals(1, executor.submittedTasks) 45 job.join() 46 finish(3) 47 } 48 49 @Test 50 fun testRejectOnWithContext() = runTest { 51 expect(1) 52 assertFailsWith<CancellationException> { 53 withContext(executor.asCoroutineDispatcher()) { 54 expectUnreached() 55 } 56 } 57 assertEquals(1, executor.submittedTasks) 58 finish(2) 59 } 60 61 @Test 62 fun testRejectOnResumeInContext() = runTest { 63 expect(1) 64 executor.acceptTasks = 1 // accept one task 65 assertFailsWith<CancellationException> { 66 withContext(executor.asCoroutineDispatcher()) { 67 expect(2) 68 assertExecutorThread() 69 try { 70 withContext(Dispatchers.Default) { 71 expect(3) 72 assertDefaultDispatcherThread() 73 // We have to wait until caller executor thread had already suspended (if not running task), 74 // so that we resume back to it a new task is posted 75 executor.awaitNotRunningTask() 76 expect(4) 77 assertDefaultDispatcherThread() 78 } 79 // cancelled on resume back 80 } finally { 81 expect(5) 82 assertIoThread() 83 } 84 expectUnreached() 85 } 86 } 87 assertEquals(2, executor.submittedTasks) 88 finish(6) 89 } 90 91 @Test 92 fun testRejectOnDelay() = runTest { 93 if (!removeFutureOnCancel(executor)) return@runTest // Skip this test on old JDKs 94 expect(1) 95 executor.acceptTasks = 1 // accept one task 96 assertFailsWith<CancellationException> { 97 withContext(executor.asCoroutineDispatcher()) { 98 expect(2) 99 assertExecutorThread() 100 try { 101 delay(10) // cancelled 102 } finally { 103 // Since it was cancelled on attempt to delay, it still stays on the same thread 104 assertExecutorThread() 105 } 106 expectUnreached() 107 } 108 } 109 assertEquals(2, executor.submittedTasks) 110 finish(3) 111 } 112 113 @Test 114 fun testRejectWithTimeout() = runTest { 115 if (!removeFutureOnCancel(executor)) return@runTest // Skip this test on old JDKs 116 expect(1) 117 executor.acceptTasks = 1 // accept one task 118 assertFailsWith<CancellationException> { 119 withContext(executor.asCoroutineDispatcher()) { 120 expect(2) 121 assertExecutorThread() 122 withTimeout(1000) { 123 expect(3) // atomic entry into the block (legacy behavior, it seem to be Ok with way) 124 assertEquals(true, coroutineContext[Job]?.isCancelled) // but the job is already cancelled 125 } 126 expectUnreached() 127 } 128 } 129 assertEquals(2, executor.submittedTasks) 130 finish(4) 131 } 132 133 private inner class RejectingExecutor : ScheduledThreadPoolExecutor(1, { r -> Thread(r, threadName) }) { 134 var acceptTasks = 0 135 var submittedTasks = 0 136 val runningTask = MutableStateFlow(false) 137 138 override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> { 139 submittedTasks++ 140 if (submittedTasks > acceptTasks) throw RejectedExecutionException() 141 val wrapper = Runnable { 142 runningTask.value = true 143 try { 144 command.run() 145 } finally { 146 runningTask.value = false 147 } 148 } 149 return super.schedule(wrapper, delay, unit) 150 } 151 152 suspend fun awaitNotRunningTask() = runningTask.first { !it } 153 } 154 155 private fun assertExecutorThread() { 156 val thread = Thread.currentThread() 157 if (!thread.name.startsWith(threadName)) error("Not an executor thread: $thread") 158 } 159 160 private fun assertDefaultDispatcherThread() { 161 val thread = Thread.currentThread() 162 if (thread !is CoroutineScheduler.Worker) error("Not a thread from Dispatchers.Default: $thread") 163 assertEquals(CoroutineScheduler.WorkerState.CPU_ACQUIRED, thread.state) 164 } 165 166 private fun assertIoThread() { 167 val thread = Thread.currentThread() 168 if (thread !is CoroutineScheduler.Worker) error("Not a thread from Dispatchers.IO: $thread") 169 assertEquals(CoroutineScheduler.WorkerState.BLOCKING, thread.state) 170 } 171 }