1 package kotlinx.coroutines.jdk9 2 3 import kotlinx.coroutines.testing.* 4 import kotlinx.coroutines.* 5 import kotlinx.coroutines.flow.* 6 import org.junit.Test 7 import java.util.concurrent.Flow as JFlow 8 import kotlin.test.* 9 10 class FlowAsPublisherTest : TestBase() { 11 12 @Test testErrorOnCancellationIsReportednull13 fun testErrorOnCancellationIsReported() { 14 expect(1) 15 flow { 16 try { 17 emit(2) 18 } finally { 19 expect(3) 20 throw TestException() 21 } 22 }.asPublisher().subscribe(object : JFlow.Subscriber<Int> { 23 private lateinit var subscription: JFlow.Subscription 24 25 override fun onComplete() { 26 expectUnreached() 27 } 28 29 override fun onSubscribe(s: JFlow.Subscription?) { 30 subscription = s!! 31 subscription.request(2) 32 } 33 34 override fun onNext(t: Int) { 35 expect(t) 36 subscription.cancel() 37 } 38 39 override fun onError(t: Throwable?) { 40 assertIs<TestException>(t) 41 expect(4) 42 } 43 }) 44 finish(5) 45 } 46 47 @Test testCancellationIsNotReportednull48 fun testCancellationIsNotReported() { 49 expect(1) 50 flow { 51 emit(2) 52 }.asPublisher().subscribe(object : JFlow.Subscriber<Int> { 53 private lateinit var subscription: JFlow.Subscription 54 55 override fun onComplete() { 56 expectUnreached() 57 } 58 59 override fun onSubscribe(s: JFlow.Subscription?) { 60 subscription = s!! 61 subscription.request(2) 62 } 63 64 override fun onNext(t: Int) { 65 expect(t) 66 subscription.cancel() 67 } 68 69 override fun onError(t: Throwable?) { 70 expectUnreached() 71 } 72 }) 73 finish(3) 74 } 75 76 @Test <lambda>null77 fun testFlowWithTimeout() = runTest { 78 val publisher = flow<Int> { 79 expect(2) 80 withTimeout(1) { delay(Long.MAX_VALUE) } 81 }.asPublisher() 82 try { 83 expect(1) 84 publisher.awaitFirstOrNull() 85 } catch (e: CancellationException) { 86 expect(3) 87 } 88 finish(4) 89 } 90 } 91