• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.*
8 import io.reactivex.disposables.*
9 import io.reactivex.plugins.*
10 import io.reactivex.schedulers.*
11 import kotlinx.coroutines.*
12 import kotlinx.coroutines.sync.*
13 import org.junit.*
14 import org.junit.Test
15 import java.lang.Runnable
16 import java.util.concurrent.*
17 import java.util.concurrent.atomic.AtomicReference
18 import kotlin.coroutines.*
19 import kotlin.test.*
20 
21 class SchedulerTest : TestBase() {
22     @Before
23     fun setup() {
24         ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
25     }
26 
27     @Test
28     fun testIoScheduler(): Unit = runTest {
29         expect(1)
30         val mainThread = Thread.currentThread()
31         withContext(Schedulers.io().asCoroutineDispatcher()) {
32             val t1 = Thread.currentThread()
33             assertNotSame(t1, mainThread)
34             expect(2)
35             delay(100)
36             val t2 = Thread.currentThread()
37             assertNotSame(t2, mainThread)
38             expect(3)
39         }
40         finish(4)
41     }
42 
43     /** Tests [toString] implementations of [CoroutineDispatcher.asScheduler] and its [Scheduler.Worker]. */
44     @Test
45     fun testSchedulerToString() {
46         val name = "Dispatchers.Default"
47         val scheduler = Dispatchers.Default.asScheduler()
48         assertContains(scheduler.toString(), name)
49         val worker = scheduler.createWorker()
50         val activeWorkerName = worker.toString()
51         assertContains(worker.toString(), name)
52         worker.dispose()
53         val disposedWorkerName = worker.toString()
54         assertNotEquals(activeWorkerName, disposedWorkerName)
55     }
56 
57     private fun runSchedulerTest(nThreads: Int = 1, action: (Scheduler) -> Unit) {
58         val future = CompletableFuture<Unit>()
59         try {
60             newFixedThreadPoolContext(nThreads, "test").use { dispatcher ->
61                 RxJavaPlugins.setErrorHandler {
62                     if (!future.completeExceptionally(it)) {
63                         handleUndeliverableException(it, dispatcher)
64                     }
65                 }
66                 action(dispatcher.asScheduler())
67             }
68         } finally {
69             RxJavaPlugins.setErrorHandler(null)
70         }
71         future.complete(Unit)
72         future.getNow(Unit) // rethrow any encountered errors
73     }
74 
75     private fun ensureSeparateThread(schedule: (Runnable, Long, TimeUnit) -> Unit, scheduleNoDelay: (Runnable) -> Unit) {
76         val mainThread = Thread.currentThread()
77         val cdl1 = CountDownLatch(1)
78         val cdl2 = CountDownLatch(1)
79         expect(1)
80         val thread = AtomicReference<Thread?>(null)
81         fun checkThread() {
82             val current = Thread.currentThread()
83             thread.getAndSet(current)?.let { assertEquals(it, current) }
84         }
85         schedule({
86             assertNotSame(mainThread, Thread.currentThread())
87             checkThread()
88             cdl2.countDown()
89         }, 300, TimeUnit.MILLISECONDS)
90         scheduleNoDelay {
91             expect(2)
92             checkThread()
93             assertNotSame(mainThread, Thread.currentThread())
94             cdl1.countDown()
95         }
96         cdl1.await()
97         cdl2.await()
98         finish(3)
99     }
100 
101     /**
102      * Tests [Scheduler.scheduleDirect] for [CoroutineDispatcher.asScheduler] on a single-threaded dispatcher.
103      */
104     @Test
105     fun testSingleThreadedDispatcherDirect(): Unit = runSchedulerTest(1) {
106         ensureSeparateThread(it::scheduleDirect, it::scheduleDirect)
107     }
108 
109     /**
110      * Tests [Scheduler.Worker.schedule] for [CoroutineDispatcher.asScheduler] running its tasks on the correct thread.
111      */
112     @Test
113     fun testSingleThreadedWorker(): Unit = runSchedulerTest(1) {
114         val worker = it.createWorker()
115         ensureSeparateThread(worker::schedule, worker::schedule)
116     }
117 
118     private fun checkCancelling(schedule: (Runnable, Long, TimeUnit) -> Disposable) {
119         // cancel the task before it has a chance to run.
120         val handle1 = schedule({
121             throw IllegalStateException("should have been successfully cancelled")
122         }, 10_000, TimeUnit.MILLISECONDS)
123         handle1.dispose()
124         // cancel the task after it started running.
125         val cdl1 = CountDownLatch(1)
126         val cdl2 = CountDownLatch(1)
127         val handle2 = schedule({
128             cdl1.countDown()
129             cdl2.await()
130             if (Thread.interrupted())
131                 throw IllegalStateException("cancelling the task should not interrupt the thread")
132         }, 100, TimeUnit.MILLISECONDS)
133         cdl1.await()
134         handle2.dispose()
135         cdl2.countDown()
136     }
137 
138     /**
139      * Test cancelling [Scheduler.scheduleDirect] for [CoroutineDispatcher.asScheduler].
140      */
141     @Test
142     fun testCancellingDirect(): Unit = runSchedulerTest {
143         checkCancelling(it::scheduleDirect)
144     }
145 
146     /**
147      * Test cancelling [Scheduler.Worker.schedule] for [CoroutineDispatcher.asScheduler].
148      */
149     @Test
150     fun testCancellingWorker(): Unit = runSchedulerTest {
151         val worker = it.createWorker()
152         checkCancelling(worker::schedule)
153     }
154 
155     /**
156      * Test shutting down [CoroutineDispatcher.asScheduler].
157      */
158     @Test
159     fun testShuttingDown() {
160         val n = 5
161         runSchedulerTest(nThreads = n) { scheduler ->
162             val cdl1 = CountDownLatch(n)
163             val cdl2 = CountDownLatch(1)
164             val cdl3 = CountDownLatch(n)
165             repeat(n) {
166                 scheduler.scheduleDirect {
167                     cdl1.countDown()
168                     try {
169                         cdl2.await()
170                     } catch (e: InterruptedException) {
171                         // this is the expected outcome
172                         cdl3.countDown()
173                     }
174                 }
175             }
176             cdl1.await()
177             scheduler.shutdown()
178             if (!cdl3.await(1, TimeUnit.SECONDS)) {
179                 cdl2.countDown()
180                 error("the tasks were not cancelled when the scheduler was shut down")
181             }
182         }
183     }
184 
185     /** Tests that there are no uncaught exceptions if [Disposable.dispose] on a worker happens when tasks are present. */
186     @Test
187     fun testDisposingWorker() = runTest {
188         val dispatcher = currentDispatcher() as CoroutineDispatcher
189         val scheduler = dispatcher.asScheduler()
190         val worker = scheduler.createWorker()
191         yield() // so that the worker starts waiting on the channel
192         assertFalse(worker.isDisposed)
193         worker.dispose()
194         assertTrue(worker.isDisposed)
195     }
196 
197     /** Tests trying to use a [Scheduler.Worker]/[Scheduler] after [Scheduler.Worker.dispose]/[Scheduler.shutdown]. */
198     @Test
199     fun testSchedulingAfterDisposing() = runSchedulerTest {
200         expect(1)
201         val worker = it.createWorker()
202         // use CDL to ensure that the worker has properly initialized
203         val cdl1 = CountDownLatch(1)
204         setScheduler(2, 3)
205         val disposable1 = worker.schedule {
206             cdl1.countDown()
207         }
208         cdl1.await()
209         expect(4)
210         assertFalse(disposable1.isDisposed)
211         setScheduler(6, -1)
212         // check that the worker automatically disposes of the tasks after being disposed
213         assertFalse(worker.isDisposed)
214         worker.dispose()
215         assertTrue(worker.isDisposed)
216         expect(5)
217         val disposable2 = worker.schedule {
218             expectUnreached()
219         }
220         assertTrue(disposable2.isDisposed)
221         setScheduler(7, 8)
222         // ensure that the scheduler still works
223         val cdl2 = CountDownLatch(1)
224         val disposable3 = it.scheduleDirect {
225             cdl2.countDown()
226         }
227         cdl2.await()
228         expect(9)
229         assertFalse(disposable3.isDisposed)
230         // check that the scheduler automatically disposes of the tasks after being shut down
231         it.shutdown()
232         setScheduler(10, -1)
233         val disposable4 = it.scheduleDirect {
234             expectUnreached()
235         }
236         assertTrue(disposable4.isDisposed)
237         RxJavaPlugins.setScheduleHandler(null)
238         finish(11)
239     }
240 
241     @Test
242     fun testSchedulerWithNoDelay(): Unit = runTest {
243         val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
244         testRunnableWithNoDelay(scheduler::scheduleDirect)
245     }
246 
247     @Test
248     fun testSchedulerWorkerWithNoDelay(): Unit = runTest {
249         val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
250         testRunnableWithNoDelay(scheduler.createWorker()::schedule)
251     }
252 
253     private suspend fun testRunnableWithNoDelay(block: RxSchedulerBlockNoDelay) {
254         expect(1)
255         suspendCancellableCoroutine<Unit> {
256             block(Runnable {
257                 expect(2)
258                 it.resume(Unit)
259             })
260         }
261         yield()
262         finish(3)
263     }
264 
265     @Test
266     fun testSchedulerWithDelay(): Unit = runTest {
267         val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
268         testRunnableWithDelay(scheduler::scheduleDirect, 300)
269     }
270 
271     @Test
272     fun testSchedulerWorkerWithDelay(): Unit = runTest {
273         val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
274         testRunnableWithDelay(scheduler.createWorker()::schedule, 300)
275     }
276 
277     @Test
278     fun testSchedulerWithZeroDelay(): Unit = runTest {
279         val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
280         testRunnableWithDelay(scheduler::scheduleDirect)
281     }
282 
283     @Test
284     fun testSchedulerWorkerWithZeroDelay(): Unit = runTest {
285         val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
286         testRunnableWithDelay(scheduler.createWorker()::schedule)
287     }
288 
289     private suspend fun testRunnableWithDelay(block: RxSchedulerBlockWithDelay, delayMillis: Long = 0) {
290         expect(1)
291         suspendCancellableCoroutine<Unit> {
292             block({
293                 expect(2)
294                 it.resume(Unit)
295             }, delayMillis, TimeUnit.MILLISECONDS)
296         }
297         finish(3)
298     }
299 
300     @Test
301     fun testAsSchedulerWithNegativeDelay(): Unit = runTest {
302         val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
303         testRunnableWithDelay(scheduler::scheduleDirect, -1)
304     }
305 
306     @Test
307     fun testAsSchedulerWorkerWithNegativeDelay(): Unit = runTest {
308         val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
309         testRunnableWithDelay(scheduler.createWorker()::schedule, -1)
310     }
311 
312     @Test
313     fun testSchedulerImmediateDispose(): Unit = runTest {
314         val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
315         testRunnableImmediateDispose(scheduler::scheduleDirect)
316     }
317 
318     @Test
319     fun testSchedulerWorkerImmediateDispose(): Unit = runTest {
320         val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
321         testRunnableImmediateDispose(scheduler.createWorker()::schedule)
322     }
323 
324     private fun testRunnableImmediateDispose(block: RxSchedulerBlockNoDelay) {
325         val disposable = block {
326             expectUnreached()
327         }
328         disposable.dispose()
329     }
330 
331     @Test
332     fun testConvertDispatcherToOriginalScheduler(): Unit = runTest {
333         val originalScheduler = Schedulers.io()
334         val dispatcher = originalScheduler.asCoroutineDispatcher()
335         val scheduler = dispatcher.asScheduler()
336         assertSame(originalScheduler, scheduler)
337     }
338 
339     @Test
340     fun testConvertSchedulerToOriginalDispatcher(): Unit = runTest {
341         val originalDispatcher = currentDispatcher() as CoroutineDispatcher
342         val scheduler = originalDispatcher.asScheduler()
343         val dispatcher = scheduler.asCoroutineDispatcher()
344         assertSame(originalDispatcher, dispatcher)
345     }
346 
347     @Test
348     fun testSchedulerExpectRxPluginsCall(): Unit = runTest {
349         val dispatcher = currentDispatcher() as CoroutineDispatcher
350         val scheduler = dispatcher.asScheduler()
351         testRunnableExpectRxPluginsCall(scheduler::scheduleDirect)
352     }
353 
354     @Test
355     fun testSchedulerWorkerExpectRxPluginsCall(): Unit = runTest {
356         val dispatcher = currentDispatcher() as CoroutineDispatcher
357         val scheduler = dispatcher.asScheduler()
358         testRunnableExpectRxPluginsCall(scheduler.createWorker()::schedule)
359     }
360 
361     private suspend fun testRunnableExpectRxPluginsCall(block: RxSchedulerBlockNoDelay) {
362         expect(1)
363         setScheduler(2, 4)
364         suspendCancellableCoroutine<Unit> {
365             block(Runnable {
366                 expect(5)
367                 it.resume(Unit)
368             })
369             expect(3)
370         }
371         RxJavaPlugins.setScheduleHandler(null)
372         finish(6)
373     }
374 
375     @Test
376     fun testSchedulerExpectRxPluginsCallWithDelay(): Unit = runTest {
377         val dispatcher = currentDispatcher() as CoroutineDispatcher
378         val scheduler = dispatcher.asScheduler()
379         testRunnableExpectRxPluginsCallDelay(scheduler::scheduleDirect)
380     }
381 
382     @Test
383     fun testSchedulerWorkerExpectRxPluginsCallWithDelay(): Unit = runTest {
384         val dispatcher = currentDispatcher() as CoroutineDispatcher
385         val scheduler = dispatcher.asScheduler()
386         val worker = scheduler.createWorker()
387         testRunnableExpectRxPluginsCallDelay(worker::schedule)
388     }
389 
390     private suspend fun testRunnableExpectRxPluginsCallDelay(block: RxSchedulerBlockWithDelay) {
391         expect(1)
392         setScheduler(2, 4)
393         suspendCancellableCoroutine<Unit> {
394             block({
395                 expect(5)
396                 it.resume(Unit)
397             }, 10, TimeUnit.MILLISECONDS)
398             expect(3)
399         }
400         RxJavaPlugins.setScheduleHandler(null)
401         finish(6)
402     }
403 
404     private fun setScheduler(expectedCountOnSchedule: Int, expectCountOnRun: Int) {
405         RxJavaPlugins.setScheduleHandler {
406             expect(expectedCountOnSchedule)
407             Runnable {
408                 expect(expectCountOnRun)
409                 it.run()
410             }
411         }
412     }
413 
414     /**
415      * Tests that [Scheduler.Worker] runs all work sequentially.
416      */
417     @Test
418     fun testWorkerSequentialOrdering() = runTest {
419         expect(1)
420         val scheduler = Dispatchers.Default.asScheduler()
421         val worker = scheduler.createWorker()
422         val iterations = 100
423         for (i in 0..iterations) {
424             worker.schedule {
425                 expect(2 + i)
426             }
427         }
428         suspendCoroutine<Unit> {
429             worker.schedule {
430                 it.resume(Unit)
431             }
432         }
433         finish((iterations + 2) + 1)
434     }
435 
436     /**
437      * Test that ensures that delays are actually respected (tasks scheduled sooner in the future run before tasks scheduled later,
438      * even when the later task is submitted before the earlier one)
439      */
440     @Test
441     fun testSchedulerRespectsDelays(): Unit = runTest {
442         val scheduler = Dispatchers.Default.asScheduler()
443         testRunnableRespectsDelays(scheduler::scheduleDirect)
444     }
445 
446     @Test
447     fun testSchedulerWorkerRespectsDelays(): Unit = runTest {
448         val scheduler = Dispatchers.Default.asScheduler()
449         testRunnableRespectsDelays(scheduler.createWorker()::schedule)
450     }
451 
452     private suspend fun testRunnableRespectsDelays(block: RxSchedulerBlockWithDelay) {
453         expect(1)
454         val semaphore = Semaphore(2, 2)
455         block({
456             expect(3)
457             semaphore.release()
458         }, 100, TimeUnit.MILLISECONDS)
459         block({
460             expect(2)
461             semaphore.release()
462         }, 1, TimeUnit.MILLISECONDS)
463         semaphore.acquire()
464         semaphore.acquire()
465         finish(4)
466     }
467 
468     /**
469      * Tests that cancelling a runnable in one worker doesn't affect work in another scheduler.
470      *
471      * This is part of expected behavior documented.
472      */
473     @Test
474     fun testMultipleWorkerCancellation(): Unit = runTest {
475         expect(1)
476         val dispatcher = currentDispatcher() as CoroutineDispatcher
477         val scheduler = dispatcher.asScheduler()
478         suspendCancellableCoroutine<Unit> {
479             val workerOne = scheduler.createWorker()
480             workerOne.schedule({
481                 expect(3)
482                 it.resume(Unit)
483             }, 50, TimeUnit.MILLISECONDS)
484             val workerTwo = scheduler.createWorker()
485             workerTwo.schedule({
486                 expectUnreached()
487             }, 1000, TimeUnit.MILLISECONDS)
488             workerTwo.dispose()
489             expect(2)
490         }
491         finish(4)
492     }
493 }
494 
495 typealias RxSchedulerBlockNoDelay = (Runnable) -> Disposable
496 typealias RxSchedulerBlockWithDelay = (Runnable, Long, TimeUnit) -> Disposable