• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx3
2 
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.core.*
5 import io.reactivex.rxjava3.disposables.*
6 import io.reactivex.rxjava3.plugins.*
7 import io.reactivex.rxjava3.schedulers.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.sync.*
10 import org.junit.*
11 import org.junit.Test
12 import java.lang.Runnable
13 import java.util.concurrent.*
14 import java.util.concurrent.atomic.AtomicReference
15 import kotlin.coroutines.*
16 import kotlin.test.*
17 
18 class SchedulerTest : TestBase() {
19     @Before
20     fun setup() {
21         ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
22     }
23 
24     @Test
25     fun testIoScheduler(): Unit = runTest {
26         expect(1)
27         val mainThread = Thread.currentThread()
28         withContext(Schedulers.io().asCoroutineDispatcher()) {
29             val t1 = Thread.currentThread()
30             assertNotSame(t1, mainThread)
31             expect(2)
32             delay(100)
33             val t2 = Thread.currentThread()
34             assertNotSame(t2, mainThread)
35             expect(3)
36         }
37         finish(4)
38     }
39 
40     /** Tests [toString] implementations of [CoroutineDispatcher.asScheduler] and its [Scheduler.Worker]. */
41     @Test
42     fun testSchedulerToString() {
43         val name = "Dispatchers.Default"
44         val scheduler = Dispatchers.Default.asScheduler()
45         assertContains(scheduler.toString(), name)
46         val worker = scheduler.createWorker()
47         val activeWorkerName = worker.toString()
48         assertContains(worker.toString(), name)
49         worker.dispose()
50         val disposedWorkerName = worker.toString()
51         assertNotEquals(activeWorkerName, disposedWorkerName)
52     }
53 
54     private fun runSchedulerTest(nThreads: Int = 1, action: (Scheduler) -> Unit) {
55         val future = CompletableFuture<Unit>()
56         try {
57             newFixedThreadPoolContext(nThreads, "test").use { dispatcher ->
58                 RxJavaPlugins.setErrorHandler {
59                     if (!future.completeExceptionally(it)) {
60                         handleUndeliverableException(it, dispatcher)
61                     }
62                 }
63                 action(dispatcher.asScheduler())
64             }
65         } finally {
66             RxJavaPlugins.setErrorHandler(null)
67         }
68         future.complete(Unit)
69         future.getNow(Unit) // rethrow any encountered errors
70     }
71 
72     private fun ensureSeparateThread(schedule: (Runnable, Long, TimeUnit) -> Unit, scheduleNoDelay: (Runnable) -> Unit) {
73         val mainThread = Thread.currentThread()
74         val cdl1 = CountDownLatch(1)
75         val cdl2 = CountDownLatch(1)
76         expect(1)
77         val thread = AtomicReference<Thread?>(null)
78         fun checkThread() {
79             val current = Thread.currentThread()
80             thread.getAndSet(current)?.let { assertEquals(it, current) }
81         }
82         schedule({
83             assertNotSame(mainThread, Thread.currentThread())
84             checkThread()
85             cdl2.countDown()
86         }, 300, TimeUnit.MILLISECONDS)
87         scheduleNoDelay {
88             expect(2)
89             checkThread()
90             assertNotSame(mainThread, Thread.currentThread())
91             cdl1.countDown()
92         }
93         cdl1.await()
94         cdl2.await()
95         finish(3)
96     }
97 
98     /**
99      * Tests [Scheduler.scheduleDirect] for [CoroutineDispatcher.asScheduler] on a single-threaded dispatcher.
100      */
101     @Test
102     fun testSingleThreadedDispatcherDirect(): Unit = runSchedulerTest(1) {
103         ensureSeparateThread(it::scheduleDirect, it::scheduleDirect)
104     }
105 
106     /**
107      * Tests [Scheduler.Worker.schedule] for [CoroutineDispatcher.asScheduler] running its tasks on the correct thread.
108      */
109     @Test
110     fun testSingleThreadedWorker(): Unit = runSchedulerTest(1) {
111         val worker = it.createWorker()
112         ensureSeparateThread(worker::schedule, worker::schedule)
113     }
114 
115     private fun checkCancelling(schedule: (Runnable, Long, TimeUnit) -> Disposable) {
116         // cancel the task before it has a chance to run.
117         val handle1 = schedule({
118             throw IllegalStateException("should have been successfully cancelled")
119         }, 10_000, TimeUnit.MILLISECONDS)
120         handle1.dispose()
121         // cancel the task after it started running.
122         val cdl1 = CountDownLatch(1)
123         val cdl2 = CountDownLatch(1)
124         val handle2 = schedule({
125             cdl1.countDown()
126             cdl2.await()
127             if (Thread.interrupted())
128                 throw IllegalStateException("cancelling the task should not interrupt the thread")
129         }, 100, TimeUnit.MILLISECONDS)
130         cdl1.await()
131         handle2.dispose()
132         cdl2.countDown()
133     }
134 
135     /**
136      * Test cancelling [Scheduler.scheduleDirect] for [CoroutineDispatcher.asScheduler].
137      */
138     @Test
139     fun testCancellingDirect(): Unit = runSchedulerTest {
140         checkCancelling(it::scheduleDirect)
141     }
142 
143     /**
144      * Test cancelling [Scheduler.Worker.schedule] for [CoroutineDispatcher.asScheduler].
145      */
146     @Test
147     fun testCancellingWorker(): Unit = runSchedulerTest {
148         val worker = it.createWorker()
149         checkCancelling(worker::schedule)
150     }
151 
152     /**
153      * Test shutting down [CoroutineDispatcher.asScheduler].
154      */
155     @Test
156     fun testShuttingDown() {
157         val n = 5
158         runSchedulerTest(nThreads = n) { scheduler ->
159             val cdl1 = CountDownLatch(n)
160             val cdl2 = CountDownLatch(1)
161             val cdl3 = CountDownLatch(n)
162             repeat(n) {
163                 scheduler.scheduleDirect {
164                     cdl1.countDown()
165                     try {
166                         cdl2.await()
167                     } catch (e: InterruptedException) {
168                         // this is the expected outcome
169                         cdl3.countDown()
170                     }
171                 }
172             }
173             cdl1.await()
174             scheduler.shutdown()
175             if (!cdl3.await(1, TimeUnit.SECONDS)) {
176                 cdl2.countDown()
177                 error("the tasks were not cancelled when the scheduler was shut down")
178             }
179         }
180     }
181 
182     /** Tests that there are no uncaught exceptions if [Disposable.dispose] on a worker happens when tasks are present. */
183     @Test
184     fun testDisposingWorker() = runTest {
185         val dispatcher = currentDispatcher() as CoroutineDispatcher
186         val scheduler = dispatcher.asScheduler()
187         val worker = scheduler.createWorker()
188         yield() // so that the worker starts waiting on the channel
189         assertFalse(worker.isDisposed)
190         worker.dispose()
191         assertTrue(worker.isDisposed)
192     }
193 
194     /** Tests trying to use a [Scheduler.Worker]/[Scheduler] after [Scheduler.Worker.dispose]/[Scheduler.shutdown]. */
195     @Test
196     fun testSchedulingAfterDisposing() = runSchedulerTest {
197         expect(1)
198         val worker = it.createWorker()
199         // use CDL to ensure that the worker has properly initialized
200         val cdl1 = CountDownLatch(1)
201         setScheduler(2, 3)
202         val disposable1 = worker.schedule {
203             cdl1.countDown()
204         }
205         cdl1.await()
206         expect(4)
207         assertFalse(disposable1.isDisposed)
208         setScheduler(6, -1)
209         // check that the worker automatically disposes of the tasks after being disposed
210         assertFalse(worker.isDisposed)
211         worker.dispose()
212         assertTrue(worker.isDisposed)
213         expect(5)
214         val disposable2 = worker.schedule {
215             expectUnreached()
216         }
217         assertTrue(disposable2.isDisposed)
218         setScheduler(7, 8)
219         // ensure that the scheduler still works
220         val cdl2 = CountDownLatch(1)
221         val disposable3 = it.scheduleDirect {
222             cdl2.countDown()
223         }
224         cdl2.await()
225         expect(9)
226         assertFalse(disposable3.isDisposed)
227         // check that the scheduler automatically disposes of the tasks after being shut down
228         it.shutdown()
229         setScheduler(10, -1)
230         val disposable4 = it.scheduleDirect {
231             expectUnreached()
232         }
233         assertTrue(disposable4.isDisposed)
234         RxJavaPlugins.setScheduleHandler(null)
235         finish(11)
236     }
237 
238     @Test
239     fun testSchedulerWithNoDelay(): Unit = runTest {
240         val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
241         testRunnableWithNoDelay(scheduler::scheduleDirect)
242     }
243 
244     @Test
245     fun testSchedulerWorkerWithNoDelay(): Unit = runTest {
246         val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
247         testRunnableWithNoDelay(scheduler.createWorker()::schedule)
248     }
249 
250     private suspend fun testRunnableWithNoDelay(block: RxSchedulerBlockNoDelay) {
251         expect(1)
252         suspendCancellableCoroutine<Unit> {
253             block(Runnable {
254                 expect(2)
255                 it.resume(Unit)
256             })
257         }
258         yield()
259         finish(3)
260     }
261 
262     @Test
263     fun testSchedulerWithDelay(): Unit = runTest {
264         val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
265         testRunnableWithDelay(scheduler::scheduleDirect, 300)
266     }
267 
268     @Test
269     fun testSchedulerWorkerWithDelay(): Unit = runTest {
270         val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
271         testRunnableWithDelay(scheduler.createWorker()::schedule, 300)
272     }
273 
274     @Test
275     fun testSchedulerWithZeroDelay(): Unit = runTest {
276         val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
277         testRunnableWithDelay(scheduler::scheduleDirect)
278     }
279 
280     @Test
281     fun testSchedulerWorkerWithZeroDelay(): Unit = runTest {
282         val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
283         testRunnableWithDelay(scheduler.createWorker()::schedule)
284     }
285 
286     private suspend fun testRunnableWithDelay(block: RxSchedulerBlockWithDelay, delayMillis: Long = 0) {
287         expect(1)
288         suspendCancellableCoroutine<Unit> {
289             block({
290                 expect(2)
291                 it.resume(Unit)
292             }, delayMillis, TimeUnit.MILLISECONDS)
293         }
294         finish(3)
295     }
296 
297     @Test
298     fun testAsSchedulerWithNegativeDelay(): Unit = runTest {
299         val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
300         testRunnableWithDelay(scheduler::scheduleDirect, -1)
301     }
302 
303     @Test
304     fun testAsSchedulerWorkerWithNegativeDelay(): Unit = runTest {
305         val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
306         testRunnableWithDelay(scheduler.createWorker()::schedule, -1)
307     }
308 
309     @Test
310     fun testSchedulerImmediateDispose(): Unit = runTest {
311         val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
312         testRunnableImmediateDispose(scheduler::scheduleDirect)
313     }
314 
315     @Test
316     fun testSchedulerWorkerImmediateDispose(): Unit = runTest {
317         val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler()
318         testRunnableImmediateDispose(scheduler.createWorker()::schedule)
319     }
320 
321     private fun testRunnableImmediateDispose(block: RxSchedulerBlockNoDelay) {
322         val disposable = block {
323             expectUnreached()
324         }
325         disposable.dispose()
326     }
327 
328     @Test
329     fun testConvertDispatcherToOriginalScheduler(): Unit = runTest {
330         val originalScheduler = Schedulers.io()
331         val dispatcher = originalScheduler.asCoroutineDispatcher()
332         val scheduler = dispatcher.asScheduler()
333         assertSame(originalScheduler, scheduler)
334     }
335 
336     @Test
337     fun testConvertSchedulerToOriginalDispatcher(): Unit = runTest {
338         val originalDispatcher = currentDispatcher() as CoroutineDispatcher
339         val scheduler = originalDispatcher.asScheduler()
340         val dispatcher = scheduler.asCoroutineDispatcher()
341         assertSame(originalDispatcher, dispatcher)
342     }
343 
344     @Test
345     fun testSchedulerExpectRxPluginsCall(): Unit = runTest {
346         val dispatcher = currentDispatcher() as CoroutineDispatcher
347         val scheduler = dispatcher.asScheduler()
348         testRunnableExpectRxPluginsCall(scheduler::scheduleDirect)
349     }
350 
351     @Test
352     fun testSchedulerWorkerExpectRxPluginsCall(): Unit = runTest {
353         val dispatcher = currentDispatcher() as CoroutineDispatcher
354         val scheduler = dispatcher.asScheduler()
355         testRunnableExpectRxPluginsCall(scheduler.createWorker()::schedule)
356     }
357 
358     private suspend fun testRunnableExpectRxPluginsCall(block: RxSchedulerBlockNoDelay) {
359         expect(1)
360         setScheduler(2, 4)
361         suspendCancellableCoroutine<Unit> {
362             block(Runnable {
363                 expect(5)
364                 it.resume(Unit)
365             })
366             expect(3)
367         }
368         RxJavaPlugins.setScheduleHandler(null)
369         finish(6)
370     }
371 
372     @Test
373     fun testSchedulerExpectRxPluginsCallWithDelay(): Unit = runTest {
374         val dispatcher = currentDispatcher() as CoroutineDispatcher
375         val scheduler = dispatcher.asScheduler()
376         testRunnableExpectRxPluginsCallDelay(scheduler::scheduleDirect)
377     }
378 
379     @Test
380     fun testSchedulerWorkerExpectRxPluginsCallWithDelay(): Unit = runTest {
381         val dispatcher = currentDispatcher() as CoroutineDispatcher
382         val scheduler = dispatcher.asScheduler()
383         val worker = scheduler.createWorker()
384         testRunnableExpectRxPluginsCallDelay(worker::schedule)
385     }
386 
387     private suspend fun testRunnableExpectRxPluginsCallDelay(block: RxSchedulerBlockWithDelay) {
388         expect(1)
389         setScheduler(2, 4)
390         suspendCancellableCoroutine<Unit> {
391             block({
392                 expect(5)
393                 it.resume(Unit)
394             }, 10, TimeUnit.MILLISECONDS)
395             expect(3)
396         }
397         RxJavaPlugins.setScheduleHandler(null)
398         finish(6)
399     }
400 
401     private fun setScheduler(expectedCountOnSchedule: Int, expectCountOnRun: Int) {
402         RxJavaPlugins.setScheduleHandler {
403             expect(expectedCountOnSchedule)
404             Runnable {
405                 expect(expectCountOnRun)
406                 it.run()
407             }
408         }
409     }
410 
411     /**
412      * Tests that [Scheduler.Worker] runs all work sequentially.
413      */
414     @Test
415     fun testWorkerSequentialOrdering() = runTest {
416         expect(1)
417         val scheduler = Dispatchers.Default.asScheduler()
418         val worker = scheduler.createWorker()
419         val iterations = 100
420         for (i in 0..iterations) {
421             worker.schedule {
422                 expect(2 + i)
423             }
424         }
425         suspendCoroutine<Unit> {
426             worker.schedule {
427                 it.resume(Unit)
428             }
429         }
430         finish((iterations + 2) + 1)
431     }
432 
433     /**
434      * Test that ensures that delays are actually respected (tasks scheduled sooner in the future run before tasks scheduled later,
435      * even when the later task is submitted before the earlier one)
436      */
437     @Test
438     fun testSchedulerRespectsDelays(): Unit = runTest {
439         val scheduler = Dispatchers.Default.asScheduler()
440         testRunnableRespectsDelays(scheduler::scheduleDirect)
441     }
442 
443     @Test
444     fun testSchedulerWorkerRespectsDelays(): Unit = runTest {
445         val scheduler = Dispatchers.Default.asScheduler()
446         testRunnableRespectsDelays(scheduler.createWorker()::schedule)
447     }
448 
449     private suspend fun testRunnableRespectsDelays(block: RxSchedulerBlockWithDelay) {
450         expect(1)
451         val semaphore = Semaphore(2, 2)
452         block({
453             expect(3)
454             semaphore.release()
455         }, 100, TimeUnit.MILLISECONDS)
456         block({
457             expect(2)
458             semaphore.release()
459         }, 1, TimeUnit.MILLISECONDS)
460         semaphore.acquire()
461         semaphore.acquire()
462         finish(4)
463     }
464 
465     /**
466      * Tests that cancelling a runnable in one worker doesn't affect work in another scheduler.
467      *
468      * This is part of expected behavior documented.
469      */
470     @Test
471     fun testMultipleWorkerCancellation(): Unit = runTest {
472         expect(1)
473         val dispatcher = currentDispatcher() as CoroutineDispatcher
474         val scheduler = dispatcher.asScheduler()
475         suspendCancellableCoroutine<Unit> {
476             val workerOne = scheduler.createWorker()
477             workerOne.schedule({
478                 expect(3)
479                 it.resume(Unit)
480             }, 50, TimeUnit.MILLISECONDS)
481             val workerTwo = scheduler.createWorker()
482             workerTwo.schedule({
483                 expectUnreached()
484             }, 1000, TimeUnit.MILLISECONDS)
485             workerTwo.dispose()
486             expect(2)
487         }
488         finish(4)
489     }
490 }
491 
492 typealias RxSchedulerBlockNoDelay = (Runnable) -> Disposable
493 typealias RxSchedulerBlockWithDelay = (Runnable, Long, TimeUnit) -> Disposable