1 /* 2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx2 6 7 import kotlinx.coroutines.* 8 import org.junit.* 9 import java.util.concurrent.* 10 11 class SchedulerStressTest : TestBase() { 12 @Before setupnull13 fun setup() { 14 ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") 15 } 16 17 /** 18 * Test that we don't get an OOM if we schedule many jobs at once. 19 * It's expected that if you don't dispose you'd see an OOM error. 20 */ 21 @Test <lambda>null22 fun testSchedulerDisposed(): Unit = runTest { 23 val dispatcher = currentDispatcher() as CoroutineDispatcher 24 val scheduler = dispatcher.asScheduler() 25 testRunnableDisposed(scheduler::scheduleDirect) 26 } 27 28 @Test <lambda>null29 fun testSchedulerWorkerDisposed(): Unit = runTest { 30 val dispatcher = currentDispatcher() as CoroutineDispatcher 31 val scheduler = dispatcher.asScheduler() 32 val worker = scheduler.createWorker() 33 testRunnableDisposed(worker::schedule) 34 } 35 testRunnableDisposednull36 private suspend fun testRunnableDisposed(block: RxSchedulerBlockNoDelay) { 37 val n = 2000 * stressTestMultiplier 38 repeat(n) { 39 val a = ByteArray(1000000) //1MB 40 val disposable = block(Runnable { 41 keepMe(a) 42 expectUnreached() 43 }) 44 disposable.dispose() 45 yield() // allow the scheduled task to observe that it was disposed 46 } 47 } 48 49 /** 50 * Test function that holds a reference. Used for testing OOM situations 51 */ keepMenull52 private fun keepMe(a: ByteArray) { 53 Thread.sleep(a.size / (a.size + 1) + 10L) 54 } 55 56 /** 57 * Test that we don't get an OOM if we schedule many delayed jobs at once. It's expected that if you don't dispose that you'd 58 * see a OOM error. 59 */ 60 @Test <lambda>null61 fun testSchedulerDisposedDuringDelay(): Unit = runTest { 62 val dispatcher = currentDispatcher() as CoroutineDispatcher 63 val scheduler = dispatcher.asScheduler() 64 testRunnableDisposedDuringDelay(scheduler::scheduleDirect) 65 } 66 67 @Test <lambda>null68 fun testSchedulerWorkerDisposedDuringDelay(): Unit = runTest { 69 val dispatcher = currentDispatcher() as CoroutineDispatcher 70 val scheduler = dispatcher.asScheduler() 71 val worker = scheduler.createWorker() 72 testRunnableDisposedDuringDelay(worker::schedule) 73 } 74 testRunnableDisposedDuringDelaynull75 private fun testRunnableDisposedDuringDelay(block: RxSchedulerBlockWithDelay) { 76 val n = 2000 * stressTestMultiplier 77 repeat(n) { 78 val a = ByteArray(1000000) //1MB 79 val delayMillis: Long = 10 80 val disposable = block(Runnable { 81 keepMe(a) 82 expectUnreached() 83 }, delayMillis, TimeUnit.MILLISECONDS) 84 disposable.dispose() 85 } 86 } 87 }