• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx3
6 
7 import kotlinx.coroutines.*
8 import org.junit.*
9 import java.util.concurrent.*
10 
11 class SchedulerStressTest : TestBase() {
12     @Before
setupnull13     fun setup() {
14         ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
15     }
16 
17     /**
18      * Test that we don't get an OOM if we schedule many jobs at once.
19      * It's expected that if you don't dispose you'd see an OOM error.
20      */
21     @Test
<lambda>null22     fun testSchedulerDisposed(): Unit = runTest {
23         val dispatcher = currentDispatcher() as CoroutineDispatcher
24         val scheduler = dispatcher.asScheduler()
25         testRunnableDisposed(scheduler::scheduleDirect)
26     }
27 
28     @Test
<lambda>null29     fun testSchedulerWorkerDisposed(): Unit = runTest {
30         val dispatcher = currentDispatcher() as CoroutineDispatcher
31         val scheduler = dispatcher.asScheduler()
32         val worker = scheduler.createWorker()
33         testRunnableDisposed(worker::schedule)
34     }
35 
testRunnableDisposednull36     private suspend fun testRunnableDisposed(block: RxSchedulerBlockNoDelay) {
37         val n = 2000 * stressTestMultiplier
38         repeat(n) {
39             val a = ByteArray(1000000) //1MB
40             val disposable = block(Runnable {
41                 keepMe(a)
42                 expectUnreached()
43             })
44             disposable.dispose()
45             yield() // allow the scheduled task to observe that it was disposed
46         }
47     }
48 
49     /**
50      * Test function that holds a reference. Used for testing OOM situations
51      */
keepMenull52     private fun keepMe(a: ByteArray) {
53         Thread.sleep(a.size / (a.size + 1) + 10L)
54     }
55 
56     /**
57      * Test that we don't get an OOM if we schedule many delayed jobs at once. It's expected that if you don't dispose that you'd
58      * see a OOM error.
59      */
60     @Test
<lambda>null61     fun testSchedulerDisposedDuringDelay(): Unit = runTest {
62         val dispatcher = currentDispatcher() as CoroutineDispatcher
63         val scheduler = dispatcher.asScheduler()
64         testRunnableDisposedDuringDelay(scheduler::scheduleDirect)
65     }
66 
67     @Test
<lambda>null68     fun testSchedulerWorkerDisposedDuringDelay(): Unit = runTest {
69         val dispatcher = currentDispatcher() as CoroutineDispatcher
70         val scheduler = dispatcher.asScheduler()
71         val worker = scheduler.createWorker()
72         testRunnableDisposedDuringDelay(worker::schedule)
73     }
74 
testRunnableDisposedDuringDelaynull75     private fun testRunnableDisposedDuringDelay(block: RxSchedulerBlockWithDelay) {
76         val n = 2000 * stressTestMultiplier
77         repeat(n) {
78             val a = ByteArray(1000000) //1MB
79             val delayMillis: Long = 10
80             val disposable = block(Runnable {
81                 keepMe(a)
82                 expectUnreached()
83             }, delayMillis, TimeUnit.MILLISECONDS)
84             disposable.dispose()
85         }
86     }
87 }
88