• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.flow.*
10 import kotlinx.coroutines.reactive.*
11 import org.junit.Test
12 import org.junit.runner.*
13 import org.junit.runners.*
14 import kotlin.coroutines.*
15 import kotlin.test.*
16 
17 @RunWith(Parameterized::class)
18 class IntegrationTest(
19     private val ctx: Ctx,
20     private val delay: Boolean
21 ) : TestBase() {
22 
23     enum class Ctx {
24         MAIN        { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) },
25         DEFAULT     { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default },
26         UNCONFINED  { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined };
27 
28         abstract operator fun invoke(context: CoroutineContext): CoroutineContext
29     }
30 
31     companion object {
32         @Parameterized.Parameters(name = "ctx={0}, delay={1}")
33         @JvmStatic
34         fun params(): Collection<Array<Any>> = Ctx.values().flatMap { ctx ->
35             listOf(false, true).map { delay ->
36                 arrayOf(ctx, delay)
37             }
38         }
39     }
40 
41     @Test
42     fun testEmpty(): Unit = runBlocking {
43         val observable = rxObservable<String>(ctx(coroutineContext)) {
44             if (delay) delay(1)
45             // does not send anything
46         }
47         assertFailsWith<NoSuchElementException> { observable.awaitFirst() }
48         assertEquals("OK", observable.awaitFirstOrDefault("OK"))
49         assertNull(observable.awaitFirstOrNull())
50         assertEquals("ELSE", observable.awaitFirstOrElse { "ELSE" })
51         assertFailsWith<NoSuchElementException> { observable.awaitLast() }
52         assertFailsWith<NoSuchElementException> { observable.awaitSingle() }
53         var cnt = 0
54         observable.collect {
55             cnt++
56         }
57         assertEquals(0, cnt)
58     }
59 
60     @Test
61     fun testSingle() = runBlocking {
62         val observable = rxObservable(ctx(coroutineContext)) {
63             if (delay) delay(1)
64             send("OK")
65         }
66         assertEquals("OK", observable.awaitFirst())
67         assertEquals("OK", observable.awaitFirstOrDefault("OK"))
68         assertEquals("OK", observable.awaitFirstOrNull())
69         assertEquals("OK", observable.awaitFirstOrElse { "ELSE" })
70         assertEquals("OK", observable.awaitLast())
71         assertEquals("OK", observable.awaitSingle())
72         var cnt = 0
73         observable.collect {
74             assertEquals("OK", it)
75             cnt++
76         }
77         assertEquals(1, cnt)
78     }
79 
80     @Test
81     fun testNumbers() = runBlocking<Unit> {
82         val n = 100 * stressTestMultiplier
83         val observable = rxObservable(ctx(coroutineContext)) {
84             for (i in 1..n) {
85                 send(i)
86                 if (delay) delay(1)
87             }
88         }
89         assertEquals(1, observable.awaitFirst())
90         assertEquals(1, observable.awaitFirstOrDefault(0))
91         assertEquals(1, observable.awaitFirstOrNull())
92         assertEquals(1, observable.awaitFirstOrElse { 0 })
93         assertEquals(n, observable.awaitLast())
94         assertFailsWith<IllegalArgumentException> { observable.awaitSingle() }
95         checkNumbers(n, observable)
96         val channel = observable.toChannel()
97         checkNumbers(n, channel.consumeAsFlow().asObservable(ctx(coroutineContext)))
98         channel.cancel()
99     }
100 
101     @Test
102     fun testCancelWithoutValue() = runTest {
103         val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
104             rxObservable<String> {
105                 hang {  }
106             }.awaitFirst()
107         }
108 
109         job.cancel()
110         job.join()
111     }
112 
113     @Test
114     fun testEmptySingle() = runTest(unhandled = listOf({e -> e is NoSuchElementException})) {
115         expect(1)
116         val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
117             rxObservable<String> {
118                 yield()
119                 expect(2)
120                 // Nothing to emit
121             }.awaitFirst()
122         }
123 
124         job.join()
125         finish(3)
126     }
127 
128     @Test
129     fun testObservableWithTimeout() = runTest {
130         val observable = rxObservable<Int> {
131             expect(2)
132             withTimeout(1) { delay(100) }
133         }
134         try {
135             expect(1)
136             observable.awaitFirstOrNull()
137         } catch (e: CancellationException) {
138             expect(3)
139         }
140         finish(4)
141     }
142 
143     private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
144         var last = 0
145         observable.collect {
146             assertEquals(++last, it)
147         }
148         assertEquals(n, last)
149     }
150 
151 }
152