• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx3
2 
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.core.*
5 import kotlinx.coroutines.*
6 import kotlinx.coroutines.flow.consumeAsFlow
7 import org.junit.Test
8 import org.junit.runner.*
9 import org.junit.runners.*
10 import kotlin.coroutines.*
11 import kotlin.test.*
12 
13 @RunWith(Parameterized::class)
14 class IntegrationTest(
15     private val ctx: Ctx,
16     private val delay: Boolean
17 ) : TestBase() {
18 
19     enum class Ctx {
20         MAIN        { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) },
21         DEFAULT     { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default },
22         UNCONFINED  { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined };
23 
24         abstract operator fun invoke(context: CoroutineContext): CoroutineContext
25     }
26 
27     companion object {
28         @Parameterized.Parameters(name = "ctx={0}, delay={1}")
29         @JvmStatic
30         fun params(): Collection<Array<Any>> = Ctx.values().flatMap { ctx ->
31             listOf(false, true).map { delay ->
32                 arrayOf(ctx, delay)
33             }
34         }
35     }
36 
37     @Test
38     fun testEmpty(): Unit = runBlocking {
39         val observable = rxObservable<String>(ctx(coroutineContext)) {
40             if (delay) delay(1)
41             // does not send anything
42         }
43         assertFailsWith<NoSuchElementException> { observable.awaitFirst() }
44         assertEquals("OK", observable.awaitFirstOrDefault("OK"))
45         assertNull(observable.awaitFirstOrNull())
46         assertEquals("ELSE", observable.awaitFirstOrElse { "ELSE" })
47         assertFailsWith<NoSuchElementException> { observable.awaitLast() }
48         assertFailsWith<NoSuchElementException> { observable.awaitSingle() }
49         var cnt = 0
50         observable.collect {
51             cnt++
52         }
53         assertEquals(0, cnt)
54     }
55 
56     @Test
57     fun testSingle() = runBlocking {
58         val observable = rxObservable(ctx(coroutineContext)) {
59             if (delay) delay(1)
60             send("OK")
61         }
62         assertEquals("OK", observable.awaitFirst())
63         assertEquals("OK", observable.awaitFirstOrDefault("OK"))
64         assertEquals("OK", observable.awaitFirstOrNull())
65         assertEquals("OK", observable.awaitFirstOrElse { "ELSE" })
66         assertEquals("OK", observable.awaitLast())
67         assertEquals("OK", observable.awaitSingle())
68         var cnt = 0
69         observable.collect {
70             assertEquals("OK", it)
71             cnt++
72         }
73         assertEquals(1, cnt)
74     }
75 
76     @Test
77     fun testNumbers() = runBlocking<Unit> {
78         val n = 100 * stressTestMultiplier
79         val observable = rxObservable(ctx(coroutineContext)) {
80             for (i in 1..n) {
81                 send(i)
82                 if (delay) delay(1)
83             }
84         }
85         assertEquals(1, observable.awaitFirst())
86         assertEquals(1, observable.awaitFirstOrDefault(0))
87         assertEquals(1, observable.awaitFirstOrNull())
88         assertEquals(1, observable.awaitFirstOrElse { 0 })
89         assertEquals(n, observable.awaitLast())
90         assertFailsWith<IllegalArgumentException> { observable.awaitSingle() }
91         checkNumbers(n, observable)
92         val channel = observable.openSubscription()
93         ctx(coroutineContext)
94         checkNumbers(n, channel.consumeAsFlow().asObservable())
95         channel.cancel()
96     }
97 
98     @Test
99     fun testCancelWithoutValue() = runTest {
100         val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
101             rxObservable<String> {
102                 hang {  }
103             }.awaitFirst()
104         }
105 
106         job.cancel()
107         job.join()
108     }
109 
110     @Test
111     fun testEmptySingle() = runTest(unhandled = listOf({e -> e is NoSuchElementException})) {
112         expect(1)
113         val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
114             rxObservable<String> {
115                 yield()
116                 expect(2)
117                 // Nothing to emit
118             }.awaitFirst()
119         }
120 
121         job.join()
122         finish(3)
123     }
124 
125     @Test
126     fun testObservableWithTimeout() = runTest {
127         val observable = rxObservable<Int> {
128             expect(2)
129             withTimeout(1) { delay(100) }
130         }
131         try {
132             expect(1)
133             observable.awaitFirstOrNull()
134         } catch (e: CancellationException) {
135             expect(3)
136         }
137         finish(4)
138     }
139 
140     private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
141         var last = 0
142         observable.collect {
143             assertEquals(++last, it)
144         }
145         assertEquals(n, last)
146     }
147 
148 }
149