1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx3 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.reactive.* 9 import org.junit.Test 10 import kotlin.test.* 11 12 class FlowableTest : TestBase() { 13 @Test 14 fun testBasicSuccess() = runBlocking { 15 expect(1) 16 val observable = rxFlowable(currentDispatcher()) { 17 expect(4) 18 send("OK") 19 } 20 expect(2) 21 observable.subscribe { value -> 22 expect(5) 23 assertEquals("OK", value) 24 } 25 expect(3) 26 yield() // to started coroutine 27 finish(6) 28 } 29 30 @Test 31 fun testBasicFailure() = runBlocking { 32 expect(1) 33 val observable = rxFlowable<String>(currentDispatcher()) { 34 expect(4) 35 throw RuntimeException("OK") 36 } 37 expect(2) 38 observable.subscribe({ 39 expectUnreached() 40 }, { error -> 41 expect(5) 42 assertTrue(error is RuntimeException) 43 assertEquals("OK", error.message) 44 }) 45 expect(3) 46 yield() // to started coroutine 47 finish(6) 48 } 49 50 @Test 51 fun testBasicUnsubscribe() = runBlocking { 52 expect(1) 53 val observable = rxFlowable<String>(currentDispatcher()) { 54 expect(4) 55 yield() // back to main, will get cancelled 56 expectUnreached() 57 } 58 expect(2) 59 val sub = observable.subscribe({ 60 expectUnreached() 61 }, { 62 expectUnreached() 63 }) 64 expect(3) 65 yield() // to started coroutine 66 expect(5) 67 sub.dispose() // will cancel coroutine 68 yield() 69 finish(6) 70 } 71 72 @Test 73 fun testNotifyOnceOnCancellation() = runTest { 74 expect(1) 75 val observable = 76 rxFlowable(currentDispatcher()) { 77 expect(5) 78 send("OK") 79 try { 80 delay(Long.MAX_VALUE) 81 } catch (e: CancellationException) { 82 expect(11) 83 } 84 } 85 .doOnNext { 86 expect(6) 87 assertEquals("OK", it) 88 } 89 .doOnCancel { 90 expect(10) // notified once! 91 } 92 expect(2) 93 val job = launch(start = CoroutineStart.UNDISPATCHED) { 94 expect(3) 95 observable.collect { 96 expect(8) 97 assertEquals("OK", it) 98 } 99 } 100 expect(4) 101 yield() // to observable code 102 expect(7) 103 yield() // to consuming coroutines 104 expect(9) 105 job.cancel() 106 job.join() 107 finish(12) 108 } 109 110 @Test 111 fun testFailingConsumer() = runTest { 112 val pub = rxFlowable(currentDispatcher()) { 113 repeat(3) { 114 expect(it + 1) // expect(1), expect(2) *should* be invoked 115 send(it) 116 } 117 } 118 try { 119 pub.collect { 120 throw TestException() 121 } 122 } catch (e: TestException) { 123 finish(3) 124 } 125 } 126 } 127