1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx2 6 7 import io.reactivex.* 8 import io.reactivex.disposables.* 9 import io.reactivex.plugins.* 10 import io.reactivex.schedulers.* 11 import kotlinx.coroutines.* 12 import kotlinx.coroutines.sync.* 13 import org.junit.* 14 import org.junit.Test 15 import java.lang.Runnable 16 import java.util.concurrent.* 17 import java.util.concurrent.atomic.AtomicReference 18 import kotlin.coroutines.* 19 import kotlin.test.* 20 21 class SchedulerTest : TestBase() { 22 @Before 23 fun setup() { 24 ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") 25 } 26 27 @Test 28 fun testIoScheduler(): Unit = runTest { 29 expect(1) 30 val mainThread = Thread.currentThread() 31 withContext(Schedulers.io().asCoroutineDispatcher()) { 32 val t1 = Thread.currentThread() 33 assertNotSame(t1, mainThread) 34 expect(2) 35 delay(100) 36 val t2 = Thread.currentThread() 37 assertNotSame(t2, mainThread) 38 expect(3) 39 } 40 finish(4) 41 } 42 43 /** Tests [toString] implementations of [CoroutineDispatcher.asScheduler] and its [Scheduler.Worker]. */ 44 @Test 45 fun testSchedulerToString() { 46 val name = "Dispatchers.Default" 47 val scheduler = Dispatchers.Default.asScheduler() 48 assertContains(scheduler.toString(), name) 49 val worker = scheduler.createWorker() 50 val activeWorkerName = worker.toString() 51 assertContains(worker.toString(), name) 52 worker.dispose() 53 val disposedWorkerName = worker.toString() 54 assertNotEquals(activeWorkerName, disposedWorkerName) 55 } 56 57 private fun runSchedulerTest(nThreads: Int = 1, action: (Scheduler) -> Unit) { 58 val future = CompletableFuture<Unit>() 59 try { 60 newFixedThreadPoolContext(nThreads, "test").use { dispatcher -> 61 RxJavaPlugins.setErrorHandler { 62 if (!future.completeExceptionally(it)) { 63 handleUndeliverableException(it, dispatcher) 64 } 65 } 66 action(dispatcher.asScheduler()) 67 } 68 } finally { 69 RxJavaPlugins.setErrorHandler(null) 70 } 71 future.complete(Unit) 72 future.getNow(Unit) // rethrow any encountered errors 73 } 74 75 private fun ensureSeparateThread(schedule: (Runnable, Long, TimeUnit) -> Unit, scheduleNoDelay: (Runnable) -> Unit) { 76 val mainThread = Thread.currentThread() 77 val cdl1 = CountDownLatch(1) 78 val cdl2 = CountDownLatch(1) 79 expect(1) 80 val thread = AtomicReference<Thread?>(null) 81 fun checkThread() { 82 val current = Thread.currentThread() 83 thread.getAndSet(current)?.let { assertEquals(it, current) } 84 } 85 schedule({ 86 assertNotSame(mainThread, Thread.currentThread()) 87 checkThread() 88 cdl2.countDown() 89 }, 300, TimeUnit.MILLISECONDS) 90 scheduleNoDelay { 91 expect(2) 92 checkThread() 93 assertNotSame(mainThread, Thread.currentThread()) 94 cdl1.countDown() 95 } 96 cdl1.await() 97 cdl2.await() 98 finish(3) 99 } 100 101 /** 102 * Tests [Scheduler.scheduleDirect] for [CoroutineDispatcher.asScheduler] on a single-threaded dispatcher. 103 */ 104 @Test 105 fun testSingleThreadedDispatcherDirect(): Unit = runSchedulerTest(1) { 106 ensureSeparateThread(it::scheduleDirect, it::scheduleDirect) 107 } 108 109 /** 110 * Tests [Scheduler.Worker.schedule] for [CoroutineDispatcher.asScheduler] running its tasks on the correct thread. 111 */ 112 @Test 113 fun testSingleThreadedWorker(): Unit = runSchedulerTest(1) { 114 val worker = it.createWorker() 115 ensureSeparateThread(worker::schedule, worker::schedule) 116 } 117 118 private fun checkCancelling(schedule: (Runnable, Long, TimeUnit) -> Disposable) { 119 // cancel the task before it has a chance to run. 120 val handle1 = schedule({ 121 throw IllegalStateException("should have been successfully cancelled") 122 }, 10_000, TimeUnit.MILLISECONDS) 123 handle1.dispose() 124 // cancel the task after it started running. 125 val cdl1 = CountDownLatch(1) 126 val cdl2 = CountDownLatch(1) 127 val handle2 = schedule({ 128 cdl1.countDown() 129 cdl2.await() 130 if (Thread.interrupted()) 131 throw IllegalStateException("cancelling the task should not interrupt the thread") 132 }, 100, TimeUnit.MILLISECONDS) 133 cdl1.await() 134 handle2.dispose() 135 cdl2.countDown() 136 } 137 138 /** 139 * Test cancelling [Scheduler.scheduleDirect] for [CoroutineDispatcher.asScheduler]. 140 */ 141 @Test 142 fun testCancellingDirect(): Unit = runSchedulerTest { 143 checkCancelling(it::scheduleDirect) 144 } 145 146 /** 147 * Test cancelling [Scheduler.Worker.schedule] for [CoroutineDispatcher.asScheduler]. 148 */ 149 @Test 150 fun testCancellingWorker(): Unit = runSchedulerTest { 151 val worker = it.createWorker() 152 checkCancelling(worker::schedule) 153 } 154 155 /** 156 * Test shutting down [CoroutineDispatcher.asScheduler]. 157 */ 158 @Test 159 fun testShuttingDown() { 160 val n = 5 161 runSchedulerTest(nThreads = n) { scheduler -> 162 val cdl1 = CountDownLatch(n) 163 val cdl2 = CountDownLatch(1) 164 val cdl3 = CountDownLatch(n) 165 repeat(n) { 166 scheduler.scheduleDirect { 167 cdl1.countDown() 168 try { 169 cdl2.await() 170 } catch (e: InterruptedException) { 171 // this is the expected outcome 172 cdl3.countDown() 173 } 174 } 175 } 176 cdl1.await() 177 scheduler.shutdown() 178 if (!cdl3.await(1, TimeUnit.SECONDS)) { 179 cdl2.countDown() 180 error("the tasks were not cancelled when the scheduler was shut down") 181 } 182 } 183 } 184 185 /** Tests that there are no uncaught exceptions if [Disposable.dispose] on a worker happens when tasks are present. */ 186 @Test 187 fun testDisposingWorker() = runTest { 188 val dispatcher = currentDispatcher() as CoroutineDispatcher 189 val scheduler = dispatcher.asScheduler() 190 val worker = scheduler.createWorker() 191 yield() // so that the worker starts waiting on the channel 192 assertFalse(worker.isDisposed) 193 worker.dispose() 194 assertTrue(worker.isDisposed) 195 } 196 197 /** Tests trying to use a [Scheduler.Worker]/[Scheduler] after [Scheduler.Worker.dispose]/[Scheduler.shutdown]. */ 198 @Test 199 fun testSchedulingAfterDisposing() = runSchedulerTest { 200 expect(1) 201 val worker = it.createWorker() 202 // use CDL to ensure that the worker has properly initialized 203 val cdl1 = CountDownLatch(1) 204 setScheduler(2, 3) 205 val disposable1 = worker.schedule { 206 cdl1.countDown() 207 } 208 cdl1.await() 209 expect(4) 210 assertFalse(disposable1.isDisposed) 211 setScheduler(6, -1) 212 // check that the worker automatically disposes of the tasks after being disposed 213 assertFalse(worker.isDisposed) 214 worker.dispose() 215 assertTrue(worker.isDisposed) 216 expect(5) 217 val disposable2 = worker.schedule { 218 expectUnreached() 219 } 220 assertTrue(disposable2.isDisposed) 221 setScheduler(7, 8) 222 // ensure that the scheduler still works 223 val cdl2 = CountDownLatch(1) 224 val disposable3 = it.scheduleDirect { 225 cdl2.countDown() 226 } 227 cdl2.await() 228 expect(9) 229 assertFalse(disposable3.isDisposed) 230 // check that the scheduler automatically disposes of the tasks after being shut down 231 it.shutdown() 232 setScheduler(10, -1) 233 val disposable4 = it.scheduleDirect { 234 expectUnreached() 235 } 236 assertTrue(disposable4.isDisposed) 237 RxJavaPlugins.setScheduleHandler(null) 238 finish(11) 239 } 240 241 @Test 242 fun testSchedulerWithNoDelay(): Unit = runTest { 243 val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() 244 testRunnableWithNoDelay(scheduler::scheduleDirect) 245 } 246 247 @Test 248 fun testSchedulerWorkerWithNoDelay(): Unit = runTest { 249 val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() 250 testRunnableWithNoDelay(scheduler.createWorker()::schedule) 251 } 252 253 private suspend fun testRunnableWithNoDelay(block: RxSchedulerBlockNoDelay) { 254 expect(1) 255 suspendCancellableCoroutine<Unit> { 256 block(Runnable { 257 expect(2) 258 it.resume(Unit) 259 }) 260 } 261 yield() 262 finish(3) 263 } 264 265 @Test 266 fun testSchedulerWithDelay(): Unit = runTest { 267 val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() 268 testRunnableWithDelay(scheduler::scheduleDirect, 300) 269 } 270 271 @Test 272 fun testSchedulerWorkerWithDelay(): Unit = runTest { 273 val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() 274 testRunnableWithDelay(scheduler.createWorker()::schedule, 300) 275 } 276 277 @Test 278 fun testSchedulerWithZeroDelay(): Unit = runTest { 279 val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() 280 testRunnableWithDelay(scheduler::scheduleDirect) 281 } 282 283 @Test 284 fun testSchedulerWorkerWithZeroDelay(): Unit = runTest { 285 val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() 286 testRunnableWithDelay(scheduler.createWorker()::schedule) 287 } 288 289 private suspend fun testRunnableWithDelay(block: RxSchedulerBlockWithDelay, delayMillis: Long = 0) { 290 expect(1) 291 suspendCancellableCoroutine<Unit> { 292 block({ 293 expect(2) 294 it.resume(Unit) 295 }, delayMillis, TimeUnit.MILLISECONDS) 296 } 297 finish(3) 298 } 299 300 @Test 301 fun testAsSchedulerWithNegativeDelay(): Unit = runTest { 302 val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() 303 testRunnableWithDelay(scheduler::scheduleDirect, -1) 304 } 305 306 @Test 307 fun testAsSchedulerWorkerWithNegativeDelay(): Unit = runTest { 308 val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() 309 testRunnableWithDelay(scheduler.createWorker()::schedule, -1) 310 } 311 312 @Test 313 fun testSchedulerImmediateDispose(): Unit = runTest { 314 val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() 315 testRunnableImmediateDispose(scheduler::scheduleDirect) 316 } 317 318 @Test 319 fun testSchedulerWorkerImmediateDispose(): Unit = runTest { 320 val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() 321 testRunnableImmediateDispose(scheduler.createWorker()::schedule) 322 } 323 324 private fun testRunnableImmediateDispose(block: RxSchedulerBlockNoDelay) { 325 val disposable = block { 326 expectUnreached() 327 } 328 disposable.dispose() 329 } 330 331 @Test 332 fun testConvertDispatcherToOriginalScheduler(): Unit = runTest { 333 val originalScheduler = Schedulers.io() 334 val dispatcher = originalScheduler.asCoroutineDispatcher() 335 val scheduler = dispatcher.asScheduler() 336 assertSame(originalScheduler, scheduler) 337 } 338 339 @Test 340 fun testConvertSchedulerToOriginalDispatcher(): Unit = runTest { 341 val originalDispatcher = currentDispatcher() as CoroutineDispatcher 342 val scheduler = originalDispatcher.asScheduler() 343 val dispatcher = scheduler.asCoroutineDispatcher() 344 assertSame(originalDispatcher, dispatcher) 345 } 346 347 @Test 348 fun testSchedulerExpectRxPluginsCall(): Unit = runTest { 349 val dispatcher = currentDispatcher() as CoroutineDispatcher 350 val scheduler = dispatcher.asScheduler() 351 testRunnableExpectRxPluginsCall(scheduler::scheduleDirect) 352 } 353 354 @Test 355 fun testSchedulerWorkerExpectRxPluginsCall(): Unit = runTest { 356 val dispatcher = currentDispatcher() as CoroutineDispatcher 357 val scheduler = dispatcher.asScheduler() 358 testRunnableExpectRxPluginsCall(scheduler.createWorker()::schedule) 359 } 360 361 private suspend fun testRunnableExpectRxPluginsCall(block: RxSchedulerBlockNoDelay) { 362 expect(1) 363 setScheduler(2, 4) 364 suspendCancellableCoroutine<Unit> { 365 block(Runnable { 366 expect(5) 367 it.resume(Unit) 368 }) 369 expect(3) 370 } 371 RxJavaPlugins.setScheduleHandler(null) 372 finish(6) 373 } 374 375 @Test 376 fun testSchedulerExpectRxPluginsCallWithDelay(): Unit = runTest { 377 val dispatcher = currentDispatcher() as CoroutineDispatcher 378 val scheduler = dispatcher.asScheduler() 379 testRunnableExpectRxPluginsCallDelay(scheduler::scheduleDirect) 380 } 381 382 @Test 383 fun testSchedulerWorkerExpectRxPluginsCallWithDelay(): Unit = runTest { 384 val dispatcher = currentDispatcher() as CoroutineDispatcher 385 val scheduler = dispatcher.asScheduler() 386 val worker = scheduler.createWorker() 387 testRunnableExpectRxPluginsCallDelay(worker::schedule) 388 } 389 390 private suspend fun testRunnableExpectRxPluginsCallDelay(block: RxSchedulerBlockWithDelay) { 391 expect(1) 392 setScheduler(2, 4) 393 suspendCancellableCoroutine<Unit> { 394 block({ 395 expect(5) 396 it.resume(Unit) 397 }, 10, TimeUnit.MILLISECONDS) 398 expect(3) 399 } 400 RxJavaPlugins.setScheduleHandler(null) 401 finish(6) 402 } 403 404 private fun setScheduler(expectedCountOnSchedule: Int, expectCountOnRun: Int) { 405 RxJavaPlugins.setScheduleHandler { 406 expect(expectedCountOnSchedule) 407 Runnable { 408 expect(expectCountOnRun) 409 it.run() 410 } 411 } 412 } 413 414 /** 415 * Tests that [Scheduler.Worker] runs all work sequentially. 416 */ 417 @Test 418 fun testWorkerSequentialOrdering() = runTest { 419 expect(1) 420 val scheduler = Dispatchers.Default.asScheduler() 421 val worker = scheduler.createWorker() 422 val iterations = 100 423 for (i in 0..iterations) { 424 worker.schedule { 425 expect(2 + i) 426 } 427 } 428 suspendCoroutine<Unit> { 429 worker.schedule { 430 it.resume(Unit) 431 } 432 } 433 finish((iterations + 2) + 1) 434 } 435 436 /** 437 * Test that ensures that delays are actually respected (tasks scheduled sooner in the future run before tasks scheduled later, 438 * even when the later task is submitted before the earlier one) 439 */ 440 @Test 441 fun testSchedulerRespectsDelays(): Unit = runTest { 442 val scheduler = Dispatchers.Default.asScheduler() 443 testRunnableRespectsDelays(scheduler::scheduleDirect) 444 } 445 446 @Test 447 fun testSchedulerWorkerRespectsDelays(): Unit = runTest { 448 val scheduler = Dispatchers.Default.asScheduler() 449 testRunnableRespectsDelays(scheduler.createWorker()::schedule) 450 } 451 452 private suspend fun testRunnableRespectsDelays(block: RxSchedulerBlockWithDelay) { 453 expect(1) 454 val semaphore = Semaphore(2, 2) 455 block({ 456 expect(3) 457 semaphore.release() 458 }, 100, TimeUnit.MILLISECONDS) 459 block({ 460 expect(2) 461 semaphore.release() 462 }, 1, TimeUnit.MILLISECONDS) 463 semaphore.acquire() 464 semaphore.acquire() 465 finish(4) 466 } 467 468 /** 469 * Tests that cancelling a runnable in one worker doesn't affect work in another scheduler. 470 * 471 * This is part of expected behavior documented. 472 */ 473 @Test 474 fun testMultipleWorkerCancellation(): Unit = runTest { 475 expect(1) 476 val dispatcher = currentDispatcher() as CoroutineDispatcher 477 val scheduler = dispatcher.asScheduler() 478 suspendCancellableCoroutine<Unit> { 479 val workerOne = scheduler.createWorker() 480 workerOne.schedule({ 481 expect(3) 482 it.resume(Unit) 483 }, 50, TimeUnit.MILLISECONDS) 484 val workerTwo = scheduler.createWorker() 485 workerTwo.schedule({ 486 expectUnreached() 487 }, 1000, TimeUnit.MILLISECONDS) 488 workerTwo.dispose() 489 expect(2) 490 } 491 finish(4) 492 } 493 } 494 495 typealias RxSchedulerBlockNoDelay = (Runnable) -> Disposable 496 typealias RxSchedulerBlockWithDelay = (Runnable, Long, TimeUnit) -> Disposable