• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.coroutines.flow.*
8 import kotlinx.coroutines.internal.*
9 import kotlinx.coroutines.scheduling.*
10 import org.junit.*
11 import org.junit.Test
12 import java.util.concurrent.*
13 import kotlin.test.*
14 
15 class RejectedExecutionTest : TestBase() {
16     private val threadName = "RejectedExecutionTest"
17     private val executor = RejectingExecutor()
18 
19     @After
20     fun tearDown() {
21         executor.shutdown()
22         executor.awaitTermination(10, TimeUnit.SECONDS)
23     }
24 
25     @Test
26     fun testRejectOnLaunch() = runTest {
27         expect(1)
28         val job = launch(executor.asCoroutineDispatcher()) {
29             expectUnreached()
30         }
31         assertEquals(1, executor.submittedTasks)
32         assertTrue(job.isCancelled)
33         finish(2)
34     }
35 
36     @Test
37     fun testRejectOnLaunchAtomic() = runTest {
38         expect(1)
39         val job = launch(executor.asCoroutineDispatcher(), start = CoroutineStart.ATOMIC) {
40             expect(2)
41             assertEquals(true, coroutineContext[Job]?.isCancelled)
42             assertIoThread() // was rejected on start, but start was atomic
43         }
44         assertEquals(1, executor.submittedTasks)
45         job.join()
46         finish(3)
47     }
48 
49     @Test
50     fun testRejectOnWithContext() = runTest {
51         expect(1)
52         assertFailsWith<CancellationException> {
53             withContext(executor.asCoroutineDispatcher()) {
54                 expectUnreached()
55             }
56         }
57         assertEquals(1, executor.submittedTasks)
58         finish(2)
59     }
60 
61     @Test
62     fun testRejectOnResumeInContext() = runTest {
63         expect(1)
64         executor.acceptTasks = 1 // accept one task
65         assertFailsWith<CancellationException> {
66                 withContext(executor.asCoroutineDispatcher()) {
67                     expect(2)
68                     assertExecutorThread()
69                     try {
70                         withContext(Dispatchers.Default) {
71                             expect(3)
72                             assertDefaultDispatcherThread()
73                             // We have to wait until caller executor thread had already suspended (if not running task),
74                             // so that we resume back to it a new task is posted
75                             executor.awaitNotRunningTask()
76                             expect(4)
77                             assertDefaultDispatcherThread()
78                         }
79                         // cancelled on resume back
80                     } finally {
81                         expect(5)
82                         assertIoThread()
83                     }
84                     expectUnreached()
85                 }
86         }
87         assertEquals(2, executor.submittedTasks)
88         finish(6)
89     }
90 
91     @Test
92     fun testRejectOnDelay() = runTest {
93         if (!removeFutureOnCancel(executor)) return@runTest // Skip this test on old JDKs
94         expect(1)
95         executor.acceptTasks = 1 // accept one task
96         assertFailsWith<CancellationException> {
97             withContext(executor.asCoroutineDispatcher()) {
98                 expect(2)
99                 assertExecutorThread()
100                 try {
101                     delay(10) // cancelled
102                 } finally {
103                     // Since it was cancelled on attempt to delay, it still stays on the same thread
104                     assertExecutorThread()
105                 }
106                 expectUnreached()
107             }
108         }
109         assertEquals(2, executor.submittedTasks)
110         finish(3)
111     }
112 
113     @Test
114     fun testRejectWithTimeout() = runTest {
115         if (!removeFutureOnCancel(executor)) return@runTest // Skip this test on old JDKs
116         expect(1)
117         executor.acceptTasks = 1 // accept one task
118         assertFailsWith<CancellationException> {
119             withContext(executor.asCoroutineDispatcher()) {
120                 expect(2)
121                 assertExecutorThread()
122                 withTimeout(1000) {
123                     expect(3) // atomic entry into the block (legacy behavior, it seem to be Ok with way)
124                     assertEquals(true, coroutineContext[Job]?.isCancelled) // but the job is already cancelled
125                 }
126                 expectUnreached()
127             }
128         }
129         assertEquals(2, executor.submittedTasks)
130         finish(4)
131     }
132 
133     private inner class RejectingExecutor : ScheduledThreadPoolExecutor(1, { r -> Thread(r, threadName) }) {
134         var acceptTasks = 0
135         var submittedTasks = 0
136         val runningTask = MutableStateFlow(false)
137 
138         override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> {
139             submittedTasks++
140             if (submittedTasks > acceptTasks) throw RejectedExecutionException()
141             val wrapper = Runnable {
142                 runningTask.value = true
143                 try {
144                     command.run()
145                 } finally {
146                     runningTask.value = false
147                 }
148             }
149             return super.schedule(wrapper, delay, unit)
150         }
151 
152         suspend fun awaitNotRunningTask() = runningTask.first { !it }
153     }
154 
155     private fun assertExecutorThread() {
156         val thread = Thread.currentThread()
157         if (!thread.name.startsWith(threadName)) error("Not an executor thread: $thread")
158     }
159 
160     private fun assertDefaultDispatcherThread() {
161         val thread = Thread.currentThread()
162         if (thread !is CoroutineScheduler.Worker) error("Not a thread from Dispatchers.Default: $thread")
163         assertEquals(CoroutineScheduler.WorkerState.CPU_ACQUIRED, thread.state)
164     }
165 
166     private fun assertIoThread() {
167         val thread = Thread.currentThread()
168         if (thread !is CoroutineScheduler.Worker) error("Not a thread from Dispatchers.IO: $thread")
169         assertEquals(CoroutineScheduler.WorkerState.BLOCKING, thread.state)
170     }
171 }