<lambda>null1package kotlinx.coroutines.rx2 2 3 import kotlinx.coroutines.testing.* 4 import kotlinx.coroutines.* 5 import kotlinx.coroutines.reactive.* 6 import org.junit.* 7 import org.junit.Test 8 import kotlin.test.* 9 10 class FlowableTest : TestBase() { 11 @Test 12 fun testBasicSuccess() = runBlocking { 13 expect(1) 14 val observable = rxFlowable(currentDispatcher()) { 15 expect(4) 16 send("OK") 17 } 18 expect(2) 19 observable.subscribe { value -> 20 expect(5) 21 assertEquals("OK", value) 22 } 23 expect(3) 24 yield() // to started coroutine 25 finish(6) 26 } 27 28 @Test 29 fun testBasicFailure() = runBlocking { 30 expect(1) 31 val observable = rxFlowable<String>(currentDispatcher()) { 32 expect(4) 33 throw RuntimeException("OK") 34 } 35 expect(2) 36 observable.subscribe({ 37 expectUnreached() 38 }, { error -> 39 expect(5) 40 assertIs<RuntimeException>(error) 41 assertEquals("OK", error.message) 42 }) 43 expect(3) 44 yield() // to started coroutine 45 finish(6) 46 } 47 48 @Test 49 fun testBasicUnsubscribe() = runBlocking { 50 expect(1) 51 val observable = rxFlowable<String>(currentDispatcher()) { 52 expect(4) 53 yield() // back to main, will get cancelled 54 expectUnreached() 55 } 56 expect(2) 57 val sub = observable.subscribe({ 58 expectUnreached() 59 }, { 60 expectUnreached() 61 }) 62 expect(3) 63 yield() // to started coroutine 64 expect(5) 65 sub.dispose() // will cancel coroutine 66 yield() 67 finish(6) 68 } 69 70 @Test 71 fun testNotifyOnceOnCancellation() = runTest { 72 expect(1) 73 val observable = 74 rxFlowable(currentDispatcher()) { 75 expect(5) 76 send("OK") 77 try { 78 delay(Long.MAX_VALUE) 79 } catch (e: CancellationException) { 80 expect(11) 81 } 82 } 83 .doOnNext { 84 expect(6) 85 assertEquals("OK", it) 86 } 87 .doOnCancel { 88 expect(10) // notified once! 89 } 90 expect(2) 91 val job = launch(start = CoroutineStart.UNDISPATCHED) { 92 expect(3) 93 observable.collect { 94 expect(8) 95 assertEquals("OK", it) 96 } 97 } 98 expect(4) 99 yield() // to observable code 100 expect(7) 101 yield() // to consuming coroutines 102 expect(9) 103 job.cancel() 104 job.join() 105 finish(12) 106 } 107 108 @Test 109 fun testFailingConsumer() = runTest { 110 val pub = rxFlowable(currentDispatcher()) { 111 repeat(3) { 112 expect(it + 1) // expect(1), expect(2) *should* be invoked 113 send(it) 114 } 115 } 116 try { 117 pub.collect { 118 throw TestException() 119 } 120 } catch (e: TestException) { 121 finish(3) 122 } 123 } 124 } 125