<lambda>null1package kotlinx.coroutines.rx3 2 3 import kotlinx.coroutines.testing.* 4 import kotlinx.coroutines.* 5 import kotlinx.coroutines.reactive.* 6 import org.junit.Test 7 import kotlin.test.* 8 9 class FlowableTest : TestBase() { 10 @Test 11 fun testBasicSuccess() = runBlocking { 12 expect(1) 13 val observable = rxFlowable(currentDispatcher()) { 14 expect(4) 15 send("OK") 16 } 17 expect(2) 18 observable.subscribe { value -> 19 expect(5) 20 assertEquals("OK", value) 21 } 22 expect(3) 23 yield() // to started coroutine 24 finish(6) 25 } 26 27 @Test 28 fun testBasicFailure() = runBlocking { 29 expect(1) 30 val observable = rxFlowable<String>(currentDispatcher()) { 31 expect(4) 32 throw RuntimeException("OK") 33 } 34 expect(2) 35 observable.subscribe({ 36 expectUnreached() 37 }, { error -> 38 expect(5) 39 assertIs<RuntimeException>(error) 40 assertEquals("OK", error.message) 41 }) 42 expect(3) 43 yield() // to started coroutine 44 finish(6) 45 } 46 47 @Test 48 fun testBasicUnsubscribe() = runBlocking { 49 expect(1) 50 val observable = rxFlowable<String>(currentDispatcher()) { 51 expect(4) 52 yield() // back to main, will get cancelled 53 expectUnreached() 54 } 55 expect(2) 56 val sub = observable.subscribe({ 57 expectUnreached() 58 }, { 59 expectUnreached() 60 }) 61 expect(3) 62 yield() // to started coroutine 63 expect(5) 64 sub.dispose() // will cancel coroutine 65 yield() 66 finish(6) 67 } 68 69 @Test 70 fun testNotifyOnceOnCancellation() = runTest { 71 expect(1) 72 val observable = 73 rxFlowable(currentDispatcher()) { 74 expect(5) 75 send("OK") 76 try { 77 delay(Long.MAX_VALUE) 78 } catch (e: CancellationException) { 79 expect(11) 80 } 81 } 82 .doOnNext { 83 expect(6) 84 assertEquals("OK", it) 85 } 86 .doOnCancel { 87 expect(10) // notified once! 88 } 89 expect(2) 90 val job = launch(start = CoroutineStart.UNDISPATCHED) { 91 expect(3) 92 observable.collect { 93 expect(8) 94 assertEquals("OK", it) 95 } 96 } 97 expect(4) 98 yield() // to observable code 99 expect(7) 100 yield() // to consuming coroutines 101 expect(9) 102 job.cancel() 103 job.join() 104 finish(12) 105 } 106 107 @Test 108 fun testFailingConsumer() = runTest { 109 val pub = rxFlowable(currentDispatcher()) { 110 repeat(3) { 111 expect(it + 1) // expect(1), expect(2) *should* be invoked 112 send(it) 113 } 114 } 115 try { 116 pub.collect { 117 throw TestException() 118 } 119 } catch (e: TestException) { 120 finish(3) 121 } 122 } 123 } 124