• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx2
2 
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.*
5 import kotlinx.coroutines.*
6 import kotlinx.coroutines.flow.*
7 import kotlinx.coroutines.reactive.*
8 import org.junit.Test
9 import org.junit.runner.*
10 import org.junit.runners.*
11 import kotlin.coroutines.*
12 import kotlin.test.*
13 
14 @RunWith(Parameterized::class)
15 class IntegrationTest(
16     private val ctx: Ctx,
17     private val delay: Boolean
18 ) : TestBase() {
19 
20     enum class Ctx {
21         MAIN        { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) },
22         DEFAULT     { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default },
23         UNCONFINED  { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined };
24 
25         abstract operator fun invoke(context: CoroutineContext): CoroutineContext
26     }
27 
28     companion object {
29         @Parameterized.Parameters(name = "ctx={0}, delay={1}")
30         @JvmStatic
31         fun params(): Collection<Array<Any>> = Ctx.values().flatMap { ctx ->
32             listOf(false, true).map { delay ->
33                 arrayOf(ctx, delay)
34             }
35         }
36     }
37 
38     @Test
39     fun testEmpty(): Unit = runBlocking {
40         val observable = rxObservable<String>(ctx(coroutineContext)) {
41             if (delay) delay(1)
42             // does not send anything
43         }
44         assertFailsWith<NoSuchElementException> { observable.awaitFirst() }
45         assertEquals("OK", observable.awaitFirstOrDefault("OK"))
46         assertNull(observable.awaitFirstOrNull())
47         assertEquals("ELSE", observable.awaitFirstOrElse { "ELSE" })
48         assertFailsWith<NoSuchElementException> { observable.awaitLast() }
49         assertFailsWith<NoSuchElementException> { observable.awaitSingle() }
50         var cnt = 0
51         observable.collect {
52             cnt++
53         }
54         assertEquals(0, cnt)
55     }
56 
57     @Test
58     fun testSingle() = runBlocking {
59         val observable = rxObservable(ctx(coroutineContext)) {
60             if (delay) delay(1)
61             send("OK")
62         }
63         assertEquals("OK", observable.awaitFirst())
64         assertEquals("OK", observable.awaitFirstOrDefault("OK"))
65         assertEquals("OK", observable.awaitFirstOrNull())
66         assertEquals("OK", observable.awaitFirstOrElse { "ELSE" })
67         assertEquals("OK", observable.awaitLast())
68         assertEquals("OK", observable.awaitSingle())
69         var cnt = 0
70         observable.collect {
71             assertEquals("OK", it)
72             cnt++
73         }
74         assertEquals(1, cnt)
75     }
76 
77     @Test
78     fun testNumbers() = runBlocking<Unit> {
79         val n = 100 * stressTestMultiplier
80         val observable = rxObservable(ctx(coroutineContext)) {
81             for (i in 1..n) {
82                 send(i)
83                 if (delay) delay(1)
84             }
85         }
86         assertEquals(1, observable.awaitFirst())
87         assertEquals(1, observable.awaitFirstOrDefault(0))
88         assertEquals(1, observable.awaitFirstOrNull())
89         assertEquals(1, observable.awaitFirstOrElse { 0 })
90         assertEquals(n, observable.awaitLast())
91         assertFailsWith<IllegalArgumentException> { observable.awaitSingle() }
92         checkNumbers(n, observable)
93         val channel = observable.toChannel()
94         checkNumbers(n, channel.consumeAsFlow().asObservable(ctx(coroutineContext)))
95         channel.cancel()
96     }
97 
98     @Test
99     fun testCancelWithoutValue() = runTest {
100         val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
101             rxObservable<String> {
102                 hang {  }
103             }.awaitFirst()
104         }
105 
106         job.cancel()
107         job.join()
108     }
109 
110     @Test
111     fun testEmptySingle() = runTest(unhandled = listOf({e -> e is NoSuchElementException})) {
112         expect(1)
113         val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
114             rxObservable<String> {
115                 yield()
116                 expect(2)
117                 // Nothing to emit
118             }.awaitFirst()
119         }
120 
121         job.join()
122         finish(3)
123     }
124 
125     @Test
126     fun testObservableWithTimeout() = runTest {
127         val observable = rxObservable<Int> {
128             expect(2)
129             withTimeout(1) { delay(100) }
130         }
131         try {
132             expect(1)
133             observable.awaitFirstOrNull()
134         } catch (e: CancellationException) {
135             expect(3)
136         }
137         finish(4)
138     }
139 
140     private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
141         var last = 0
142         observable.collect {
143             assertEquals(++last, it)
144         }
145         assertEquals(n, last)
146     }
147 
148 }
149