• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package kotlinx.coroutines.rx2
2 
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.*
5 import org.junit.*
6 import java.util.concurrent.*
7 
8 class SchedulerStressTest : TestBase() {
9     @Before
setupnull10     fun setup() {
11         ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
12     }
13 
14     /**
15      * Test that we don't get an OOM if we schedule many jobs at once.
16      * It's expected that if you don't dispose you'd see an OOM error.
17      */
18     @Test
<lambda>null19     fun testSchedulerDisposed(): Unit = runTest {
20         val dispatcher = currentDispatcher() as CoroutineDispatcher
21         val scheduler = dispatcher.asScheduler()
22         testRunnableDisposed(scheduler::scheduleDirect)
23     }
24 
25     @Test
<lambda>null26     fun testSchedulerWorkerDisposed(): Unit = runTest {
27         val dispatcher = currentDispatcher() as CoroutineDispatcher
28         val scheduler = dispatcher.asScheduler()
29         val worker = scheduler.createWorker()
30         testRunnableDisposed(worker::schedule)
31     }
32 
testRunnableDisposednull33     private suspend fun testRunnableDisposed(block: RxSchedulerBlockNoDelay) {
34         val n = 2000 * stressTestMultiplier
35         repeat(n) {
36             val a = ByteArray(1000000) //1MB
37             val disposable = block(Runnable {
38                 keepMe(a)
39                 expectUnreached()
40             })
41             disposable.dispose()
42             yield() // allow the scheduled task to observe that it was disposed
43         }
44     }
45 
46     /**
47      * Test function that holds a reference. Used for testing OOM situations
48      */
keepMenull49     private fun keepMe(a: ByteArray) {
50         Thread.sleep(a.size / (a.size + 1) + 10L)
51     }
52 
53     /**
54      * Test that we don't get an OOM if we schedule many delayed jobs at once. It's expected that if you don't dispose that you'd
55      * see a OOM error.
56      */
57     @Test
<lambda>null58     fun testSchedulerDisposedDuringDelay(): Unit = runTest {
59         val dispatcher = currentDispatcher() as CoroutineDispatcher
60         val scheduler = dispatcher.asScheduler()
61         testRunnableDisposedDuringDelay(scheduler::scheduleDirect)
62     }
63 
64     @Test
<lambda>null65     fun testSchedulerWorkerDisposedDuringDelay(): Unit = runTest {
66         val dispatcher = currentDispatcher() as CoroutineDispatcher
67         val scheduler = dispatcher.asScheduler()
68         val worker = scheduler.createWorker()
69         testRunnableDisposedDuringDelay(worker::schedule)
70     }
71 
testRunnableDisposedDuringDelaynull72     private fun testRunnableDisposedDuringDelay(block: RxSchedulerBlockWithDelay) {
73         val n = 2000 * stressTestMultiplier
74         repeat(n) {
75             val a = ByteArray(1000000) //1MB
76             val delayMillis: Long = 10
77             val disposable = block(Runnable {
78                 keepMe(a)
79                 expectUnreached()
80             }, delayMillis, TimeUnit.MILLISECONDS)
81             disposable.dispose()
82         }
83     }
84 }