1 package kotlinx.coroutines.rx2 2 3 import kotlinx.coroutines.testing.* 4 import kotlinx.coroutines.* 5 import org.junit.* 6 import java.util.concurrent.* 7 8 class SchedulerStressTest : TestBase() { 9 @Before setupnull10 fun setup() { 11 ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") 12 } 13 14 /** 15 * Test that we don't get an OOM if we schedule many jobs at once. 16 * It's expected that if you don't dispose you'd see an OOM error. 17 */ 18 @Test <lambda>null19 fun testSchedulerDisposed(): Unit = runTest { 20 val dispatcher = currentDispatcher() as CoroutineDispatcher 21 val scheduler = dispatcher.asScheduler() 22 testRunnableDisposed(scheduler::scheduleDirect) 23 } 24 25 @Test <lambda>null26 fun testSchedulerWorkerDisposed(): Unit = runTest { 27 val dispatcher = currentDispatcher() as CoroutineDispatcher 28 val scheduler = dispatcher.asScheduler() 29 val worker = scheduler.createWorker() 30 testRunnableDisposed(worker::schedule) 31 } 32 testRunnableDisposednull33 private suspend fun testRunnableDisposed(block: RxSchedulerBlockNoDelay) { 34 val n = 2000 * stressTestMultiplier 35 repeat(n) { 36 val a = ByteArray(1000000) //1MB 37 val disposable = block(Runnable { 38 keepMe(a) 39 expectUnreached() 40 }) 41 disposable.dispose() 42 yield() // allow the scheduled task to observe that it was disposed 43 } 44 } 45 46 /** 47 * Test function that holds a reference. Used for testing OOM situations 48 */ keepMenull49 private fun keepMe(a: ByteArray) { 50 Thread.sleep(a.size / (a.size + 1) + 10L) 51 } 52 53 /** 54 * Test that we don't get an OOM if we schedule many delayed jobs at once. It's expected that if you don't dispose that you'd 55 * see a OOM error. 56 */ 57 @Test <lambda>null58 fun testSchedulerDisposedDuringDelay(): Unit = runTest { 59 val dispatcher = currentDispatcher() as CoroutineDispatcher 60 val scheduler = dispatcher.asScheduler() 61 testRunnableDisposedDuringDelay(scheduler::scheduleDirect) 62 } 63 64 @Test <lambda>null65 fun testSchedulerWorkerDisposedDuringDelay(): Unit = runTest { 66 val dispatcher = currentDispatcher() as CoroutineDispatcher 67 val scheduler = dispatcher.asScheduler() 68 val worker = scheduler.createWorker() 69 testRunnableDisposedDuringDelay(worker::schedule) 70 } 71 testRunnableDisposedDuringDelaynull72 private fun testRunnableDisposedDuringDelay(block: RxSchedulerBlockWithDelay) { 73 val n = 2000 * stressTestMultiplier 74 repeat(n) { 75 val a = ByteArray(1000000) //1MB 76 val delayMillis: Long = 10 77 val disposable = block(Runnable { 78 keepMe(a) 79 expectUnreached() 80 }, delayMillis, TimeUnit.MILLISECONDS) 81 disposable.dispose() 82 } 83 } 84 }