1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx2 6 7 import io.reactivex.* 8 import kotlinx.coroutines.* 9 import kotlinx.coroutines.flow.* 10 import kotlinx.coroutines.reactive.* 11 import org.junit.Test 12 import org.junit.runner.* 13 import org.junit.runners.* 14 import kotlin.coroutines.* 15 import kotlin.test.* 16 17 @RunWith(Parameterized::class) 18 class IntegrationTest( 19 private val ctx: Ctx, 20 private val delay: Boolean 21 ) : TestBase() { 22 23 enum class Ctx { 24 MAIN { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) }, 25 DEFAULT { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default }, 26 UNCONFINED { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined }; 27 28 abstract operator fun invoke(context: CoroutineContext): CoroutineContext 29 } 30 31 companion object { 32 @Parameterized.Parameters(name = "ctx={0}, delay={1}") 33 @JvmStatic 34 fun params(): Collection<Array<Any>> = Ctx.values().flatMap { ctx -> 35 listOf(false, true).map { delay -> 36 arrayOf(ctx, delay) 37 } 38 } 39 } 40 41 @Test 42 fun testEmpty(): Unit = runBlocking { 43 val observable = rxObservable<String>(ctx(coroutineContext)) { 44 if (delay) delay(1) 45 // does not send anything 46 } 47 assertFailsWith<NoSuchElementException> { observable.awaitFirst() } 48 assertEquals("OK", observable.awaitFirstOrDefault("OK")) 49 assertNull(observable.awaitFirstOrNull()) 50 assertEquals("ELSE", observable.awaitFirstOrElse { "ELSE" }) 51 assertFailsWith<NoSuchElementException> { observable.awaitLast() } 52 assertFailsWith<NoSuchElementException> { observable.awaitSingle() } 53 var cnt = 0 54 observable.collect { 55 cnt++ 56 } 57 assertEquals(0, cnt) 58 } 59 60 @Test 61 fun testSingle() = runBlocking { 62 val observable = rxObservable(ctx(coroutineContext)) { 63 if (delay) delay(1) 64 send("OK") 65 } 66 assertEquals("OK", observable.awaitFirst()) 67 assertEquals("OK", observable.awaitFirstOrDefault("OK")) 68 assertEquals("OK", observable.awaitFirstOrNull()) 69 assertEquals("OK", observable.awaitFirstOrElse { "ELSE" }) 70 assertEquals("OK", observable.awaitLast()) 71 assertEquals("OK", observable.awaitSingle()) 72 var cnt = 0 73 observable.collect { 74 assertEquals("OK", it) 75 cnt++ 76 } 77 assertEquals(1, cnt) 78 } 79 80 @Test 81 fun testNumbers() = runBlocking<Unit> { 82 val n = 100 * stressTestMultiplier 83 val observable = rxObservable(ctx(coroutineContext)) { 84 for (i in 1..n) { 85 send(i) 86 if (delay) delay(1) 87 } 88 } 89 assertEquals(1, observable.awaitFirst()) 90 assertEquals(1, observable.awaitFirstOrDefault(0)) 91 assertEquals(1, observable.awaitFirstOrNull()) 92 assertEquals(1, observable.awaitFirstOrElse { 0 }) 93 assertEquals(n, observable.awaitLast()) 94 assertFailsWith<IllegalArgumentException> { observable.awaitSingle() } 95 checkNumbers(n, observable) 96 val channel = observable.toChannel() 97 checkNumbers(n, channel.consumeAsFlow().asObservable(ctx(coroutineContext))) 98 channel.cancel() 99 } 100 101 @Test 102 fun testCancelWithoutValue() = runTest { 103 val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) { 104 rxObservable<String> { 105 hang { } 106 }.awaitFirst() 107 } 108 109 job.cancel() 110 job.join() 111 } 112 113 @Test 114 fun testEmptySingle() = runTest(unhandled = listOf({e -> e is NoSuchElementException})) { 115 expect(1) 116 val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) { 117 rxObservable<String> { 118 yield() 119 expect(2) 120 // Nothing to emit 121 }.awaitFirst() 122 } 123 124 job.join() 125 finish(3) 126 } 127 128 @Test 129 fun testObservableWithTimeout() = runTest { 130 val observable = rxObservable<Int> { 131 expect(2) 132 withTimeout(1) { delay(100) } 133 } 134 try { 135 expect(1) 136 observable.awaitFirstOrNull() 137 } catch (e: CancellationException) { 138 expect(3) 139 } 140 finish(4) 141 } 142 143 private suspend fun checkNumbers(n: Int, observable: Observable<Int>) { 144 var last = 0 145 observable.collect { 146 assertEquals(++last, it) 147 } 148 assertEquals(n, last) 149 } 150 151 } 152