<lambda>null1 package kotlinx.coroutines.rx3
2
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.core.*
5 import io.reactivex.rxjava3.disposables.*
6 import io.reactivex.rxjava3.plugins.*
7 import io.reactivex.rxjava3.schedulers.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.sync.*
10 import org.junit.*
11 import org.junit.Test
12 import java.lang.Runnable
13 import java.util.concurrent.*
14 import java.util.concurrent.atomic.AtomicReference
15 import kotlin.coroutines.*
16 import kotlin.test.*
17
18 class SchedulerTest : TestBase() {
19 @Before
20 fun setup() {
21 ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
22 }
23
24 @Test
25 fun testIoScheduler(): Unit = runTest {
26 expect(1)
27 val mainThread = Thread.currentThread()
28 withContext(Schedulers.io().asCoroutineDispatcher()) {
29 val t1 = Thread.currentThread()
30 assertNotSame(t1, mainThread)
31 expect(2)
32 delay(100)
33 val t2 = Thread.currentThread()
34 assertNotSame(t2, mainThread)
35 expect(3)
36 }
37 finish(4)
38 }
39
40 /** Tests [toString] implementations of [CoroutineDispatcher.asScheduler] and its [Scheduler.Worker]. */
41 @Test
42 fun testSchedulerToString() {
43 val name = "Dispatchers.Default"
44 val scheduler = Dispatchers.Default.asScheduler()
45 assertContains(scheduler.toString(), name)
46 val worker = scheduler.createWorker()
47 val activeWorkerName = worker.toString()
48 assertContains(worker.toString(), name)
49 worker.dispose()
50 val disposedWorkerName = worker.toString()
51 assertNotEquals(activeWorkerName, disposedWorkerName)
52 }
53
54 private fun runSchedulerTest(nThreads: Int = 1, action: (Scheduler) -> Unit) {
55 val future = CompletableFuture<Unit>()
56 try {
57 newFixedThreadPoolContext(nThreads, "test").use { dispatcher ->
58 RxJavaPlugins.setErrorHandler {
59 if (!future.completeExceptionally(it)) {
60 handleUndeliverableException(it, dispatcher)
61 }
62 }
63 action(dispatcher.asScheduler())
64 }
65 } finally {
66 RxJavaPlugins.setErrorHandler(null)
67 }
68 future.complete(Unit)
69 future.getNow(Unit) // rethrow any encountered errors
70 }
71
72 private fun ensureSeparateThread(schedule: (Runnable, Long, TimeUnit) -> Unit, scheduleNoDelay: (Runnable) -> Unit) {
73 val mainThread = Thread.currentThread()
74 val cdl1 = CountDownLatch(1)
75 val cdl2 = CountDownLatch(1)
76 expect(1)
77 val thread = AtomicReference<Thread?>(null)
78 fun checkThread() {
79 val current = Thread.currentThread()
80 thread.getAndSet(current)?.let { assertEquals(it, current) }
81 }
82 schedule({
83 assertNotSame(mainThread, Thread.currentThread())
84 checkThread()
85 cdl2.countDown()
86 }, 300, TimeUnit.MILLISECONDS)
87 scheduleNoDelay {
88 expect(2)
89 checkThread()
90 assertNotSame(mainThread, Thread.currentThread())
91 cdl1.countDown()
92 }
93 cdl1.await()
94 cdl2.await()
95 finish(3)
96 }
97
98 /**
99 * Tests [Scheduler.scheduleDirect] for [CoroutineDispatcher.asScheduler] on a single-threaded dispatcher.
100 */
101 @Test
102 fun testSingleThreadedDispatcherDirect(): Unit = runSchedulerTest(1) {
103 ensureSeparateThread(it::scheduleDirect, it::scheduleDirect)
104 }
105
106 /**
107 * Tests [Scheduler.Worker.schedule] for [CoroutineDispatcher.asScheduler] running its tasks on the correct thread.
108 */
109 @Test
110 fun testSingleThreadedWorker(): Unit = runSchedulerTest(1) {
111 val worker = it.createWorker()
112 ensureSeparateThread(worker::schedule, worker::schedule)
113 }
114
115 private fun checkCancelling(schedule: (Runnable, Long, TimeUnit) -> Disposable) {
116 // cancel the task before it has a chance to run.
117 val handle1 = schedule({
118 throw IllegalStateException("should have been successfully cancelled")
119 }, 10_000, TimeUnit.MILLISECONDS)
120 handle1.dispose()
121 // cancel the task after it started running.
122 val cdl1 = CountDownLatch(1)
123 val cdl2 = CountDownLatch(1)
124 val handle2 = schedule({
125 cdl1.countDown()
126 cdl2.await()
127 if (Thread.interrupted())
128 throw IllegalStateException("cancelling the task should not interrupt the thread")
129 }, 100, TimeUnit.MILLISECONDS)
130 cdl1.await()
131 handle2.dispose()
132 cdl2.countDown()
133 }
134
135 /**
136 * Test cancelling [Scheduler.scheduleDirect] for [CoroutineDispatcher.asScheduler].
137 */
138 @Test
139 fun testCancellingDirect(): Unit = runSchedulerTest {
140 checkCancelling(it::scheduleDirect)
141 }
142
143 /**
144 * Test cancelling [Scheduler.Worker.schedule] for [CoroutineDispatcher.asScheduler].
145 */
146 @Test
147 fun testCancellingWorker(): Unit = runSchedulerTest {
148 val worker = it.createWorker()
149 checkCancelling(worker::schedule)
150 }
151
152 /**
153 * Test shutting down [CoroutineDispatcher.asScheduler].
154 */
155 @Test
156 fun testShuttingDown() {
157 val n = 5
158 runSchedulerTest(nThreads = n) { scheduler ->
159 val cdl1 = CountDownLatch(n)
160 val cdl2 = CountDownLatch(1)
161 val cdl3 = CountDownLatch(n)
162 repeat(n) {
163 scheduler.scheduleDirect {
164 cdl1.countDown()
165 try {
166 cdl2.await()
167 } catch (e: InterruptedException) {
168 // this is the expected outcome
169 cdl3.countDown()
170 }
171 }
172 }
173 cdl1.await()
174 scheduler.shutdown()
175 if (!cdl3.await(1, TimeUnit.SECONDS)) {
176 cdl2.countDown()
177 error("the tasks were not cancelled when the scheduler was shut down")
178 }
179 }
180 }
181
182 /** Tests that there are no uncaught exceptions if [Disposable.dispose] on a worker happens when tasks are present. */
183 @Test
184 fun testDisposingWorker() = runTest {
185 val dispatcher = currentDispatcher() as CoroutineDispatcher
186 val scheduler = dispatcher.asScheduler()
187 val worker = scheduler.createWorker()
188 yield() // so that the worker starts waiting on the channel
189 assertFalse(worker.isDisposed)
190 worker.dispose()
191 assertTrue(worker.isDisposed)
192 }
193
194 /** Tests trying to use a [Scheduler.Worker]/[Scheduler] after [Scheduler.Worker.dispose]/[Scheduler.shutdown]. */
195 @Test
196 fun testSchedulingAfterDisposing() = runSchedulerTest {
197 expect(1)
198 val worker = it.createWorker()
199 // use CDL to ensure that the worker has properly initialized
200 val cdl1 = CountDownLatch(1)
201 setScheduler(2, 3)
202 val disposable1 = worker.schedule {
203 cdl1.countDown()
204 }
205 cdl1.await()
206 expect(4)
207 assertFalse(disposable1.isDisposed)
208 setScheduler(6, -1)
209 // check that the worker automatically disposes of the tasks after being disposed
210 assertFalse(worker.isDisposed)
211 worker.dispose()
212 assertTrue(worker.isDisposed)
213 expect(5)
214 val disposable2 = worker.schedule {
215 expectUnreached()
216 }
217 assertTrue(disposable2.isDisposed)
218 setScheduler(7, 8)
219 // ensure that the scheduler still works
220 val cdl2 = CountDownLatch(1)
221 val disposable3 = it.scheduleDirect {
222 cdl2.countDown()
223 }
224 cdl2.await()
225 expect(9)
226 assertFalse(disposable3.isDisposed)
227 // check that the scheduler automatically disposes of the tasks after being shut down
228 it.shutdown()
229 setScheduler(10, -1)
230 val disposable4 = it.scheduleDirect {
231 expectUnreached()
232 }
233 assertTrue(disposable4.isDisposed)
234 RxJavaPlugins.setScheduleHandler(null)
235 finish(11)
236 }
237
238 @Test
239 fun testSchedulerWithNoDelay(): Unit = runTest {
240 val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
241 testRunnableWithNoDelay(scheduler::scheduleDirect)
242 }
243
244 @Test
245 fun testSchedulerWorkerWithNoDelay(): Unit = runTest {
246 val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
247 testRunnableWithNoDelay(scheduler.createWorker()::schedule)
248 }
249
250 private suspend fun testRunnableWithNoDelay(block: RxSchedulerBlockNoDelay) {
251 expect(1)
252 suspendCancellableCoroutine<Unit> {
253 block(Runnable {
254 expect(2)
255 it.resume(Unit)
256 })
257 }
258 yield()
259 finish(3)
260 }
261
262 @Test
263 fun testSchedulerWithDelay(): Unit = runTest {
264 val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
265 testRunnableWithDelay(scheduler::scheduleDirect, 300)
266 }
267
268 @Test
269 fun testSchedulerWorkerWithDelay(): Unit = runTest {
270 val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
271 testRunnableWithDelay(scheduler.createWorker()::schedule, 300)
272 }
273
274 @Test
275 fun testSchedulerWithZeroDelay(): Unit = runTest {
276 val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
277 testRunnableWithDelay(scheduler::scheduleDirect)
278 }
279
280 @Test
281 fun testSchedulerWorkerWithZeroDelay(): Unit = runTest {
282 val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
283 testRunnableWithDelay(scheduler.createWorker()::schedule)
284 }
285
286 private suspend fun testRunnableWithDelay(block: RxSchedulerBlockWithDelay, delayMillis: Long = 0) {
287 expect(1)
288 suspendCancellableCoroutine<Unit> {
289 block({
290 expect(2)
291 it.resume(Unit)
292 }, delayMillis, TimeUnit.MILLISECONDS)
293 }
294 finish(3)
295 }
296
297 @Test
298 fun testAsSchedulerWithNegativeDelay(): Unit = runTest {
299 val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
300 testRunnableWithDelay(scheduler::scheduleDirect, -1)
301 }
302
303 @Test
304 fun testAsSchedulerWorkerWithNegativeDelay(): Unit = runTest {
305 val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
306 testRunnableWithDelay(scheduler.createWorker()::schedule, -1)
307 }
308
309 @Test
310 fun testSchedulerImmediateDispose(): Unit = runTest {
311 val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
312 testRunnableImmediateDispose(scheduler::scheduleDirect)
313 }
314
315 @Test
316 fun testSchedulerWorkerImmediateDispose(): Unit = runTest {
317 val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
318 testRunnableImmediateDispose(scheduler.createWorker()::schedule)
319 }
320
321 private fun testRunnableImmediateDispose(block: RxSchedulerBlockNoDelay) {
322 val disposable = block {
323 expectUnreached()
324 }
325 disposable.dispose()
326 }
327
328 @Test
329 fun testConvertDispatcherToOriginalScheduler(): Unit = runTest {
330 val originalScheduler = Schedulers.io()
331 val dispatcher = originalScheduler.asCoroutineDispatcher()
332 val scheduler = dispatcher.asScheduler()
333 assertSame(originalScheduler, scheduler)
334 }
335
336 @Test
337 fun testConvertSchedulerToOriginalDispatcher(): Unit = runTest {
338 val originalDispatcher = currentDispatcher() as CoroutineDispatcher
339 val scheduler = originalDispatcher.asScheduler()
340 val dispatcher = scheduler.asCoroutineDispatcher()
341 assertSame(originalDispatcher, dispatcher)
342 }
343
344 @Test
345 fun testSchedulerExpectRxPluginsCall(): Unit = runTest {
346 val dispatcher = currentDispatcher() as CoroutineDispatcher
347 val scheduler = dispatcher.asScheduler()
348 testRunnableExpectRxPluginsCall(scheduler::scheduleDirect)
349 }
350
351 @Test
352 fun testSchedulerWorkerExpectRxPluginsCall(): Unit = runTest {
353 val dispatcher = currentDispatcher() as CoroutineDispatcher
354 val scheduler = dispatcher.asScheduler()
355 testRunnableExpectRxPluginsCall(scheduler.createWorker()::schedule)
356 }
357
358 private suspend fun testRunnableExpectRxPluginsCall(block: RxSchedulerBlockNoDelay) {
359 expect(1)
360 setScheduler(2, 4)
361 suspendCancellableCoroutine<Unit> {
362 block(Runnable {
363 expect(5)
364 it.resume(Unit)
365 })
366 expect(3)
367 }
368 RxJavaPlugins.setScheduleHandler(null)
369 finish(6)
370 }
371
372 @Test
373 fun testSchedulerExpectRxPluginsCallWithDelay(): Unit = runTest {
374 val dispatcher = currentDispatcher() as CoroutineDispatcher
375 val scheduler = dispatcher.asScheduler()
376 testRunnableExpectRxPluginsCallDelay(scheduler::scheduleDirect)
377 }
378
379 @Test
380 fun testSchedulerWorkerExpectRxPluginsCallWithDelay(): Unit = runTest {
381 val dispatcher = currentDispatcher() as CoroutineDispatcher
382 val scheduler = dispatcher.asScheduler()
383 val worker = scheduler.createWorker()
384 testRunnableExpectRxPluginsCallDelay(worker::schedule)
385 }
386
387 private suspend fun testRunnableExpectRxPluginsCallDelay(block: RxSchedulerBlockWithDelay) {
388 expect(1)
389 setScheduler(2, 4)
390 suspendCancellableCoroutine<Unit> {
391 block({
392 expect(5)
393 it.resume(Unit)
394 }, 10, TimeUnit.MILLISECONDS)
395 expect(3)
396 }
397 RxJavaPlugins.setScheduleHandler(null)
398 finish(6)
399 }
400
401 private fun setScheduler(expectedCountOnSchedule: Int, expectCountOnRun: Int) {
402 RxJavaPlugins.setScheduleHandler {
403 expect(expectedCountOnSchedule)
404 Runnable {
405 expect(expectCountOnRun)
406 it.run()
407 }
408 }
409 }
410
411 /**
412 * Tests that [Scheduler.Worker] runs all work sequentially.
413 */
414 @Test
415 fun testWorkerSequentialOrdering() = runTest {
416 expect(1)
417 val scheduler = Dispatchers.Default.asScheduler()
418 val worker = scheduler.createWorker()
419 val iterations = 100
420 for (i in 0..iterations) {
421 worker.schedule {
422 expect(2 + i)
423 }
424 }
425 suspendCoroutine<Unit> {
426 worker.schedule {
427 it.resume(Unit)
428 }
429 }
430 finish((iterations + 2) + 1)
431 }
432
433 /**
434 * Test that ensures that delays are actually respected (tasks scheduled sooner in the future run before tasks scheduled later,
435 * even when the later task is submitted before the earlier one)
436 */
437 @Test
438 fun testSchedulerRespectsDelays(): Unit = runTest {
439 val scheduler = Dispatchers.Default.asScheduler()
440 testRunnableRespectsDelays(scheduler::scheduleDirect)
441 }
442
443 @Test
444 fun testSchedulerWorkerRespectsDelays(): Unit = runTest {
445 val scheduler = Dispatchers.Default.asScheduler()
446 testRunnableRespectsDelays(scheduler.createWorker()::schedule)
447 }
448
449 private suspend fun testRunnableRespectsDelays(block: RxSchedulerBlockWithDelay) {
450 expect(1)
451 val semaphore = Semaphore(2, 2)
452 block({
453 expect(3)
454 semaphore.release()
455 }, 100, TimeUnit.MILLISECONDS)
456 block({
457 expect(2)
458 semaphore.release()
459 }, 1, TimeUnit.MILLISECONDS)
460 semaphore.acquire()
461 semaphore.acquire()
462 finish(4)
463 }
464
465 /**
466 * Tests that cancelling a runnable in one worker doesn't affect work in another scheduler.
467 *
468 * This is part of expected behavior documented.
469 */
470 @Test
471 fun testMultipleWorkerCancellation(): Unit = runTest {
472 expect(1)
473 val dispatcher = currentDispatcher() as CoroutineDispatcher
474 val scheduler = dispatcher.asScheduler()
475 suspendCancellableCoroutine<Unit> {
476 val workerOne = scheduler.createWorker()
477 workerOne.schedule({
478 expect(3)
479 it.resume(Unit)
480 }, 50, TimeUnit.MILLISECONDS)
481 val workerTwo = scheduler.createWorker()
482 workerTwo.schedule({
483 expectUnreached()
484 }, 1000, TimeUnit.MILLISECONDS)
485 workerTwo.dispose()
486 expect(2)
487 }
488 finish(4)
489 }
490 }
491
492 typealias RxSchedulerBlockNoDelay = (Runnable) -> Disposable
493 typealias RxSchedulerBlockWithDelay = (Runnable, Long, TimeUnit) -> Disposable