1 /* 2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.jdk9 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.flow.* 9 import org.junit.Test 10 import java.util.concurrent.Flow as JFlow 11 import kotlin.test.* 12 13 class FlowAsPublisherTest : TestBase() { 14 15 @Test testErrorOnCancellationIsReportednull16 fun testErrorOnCancellationIsReported() { 17 expect(1) 18 flow { 19 try { 20 emit(2) 21 } finally { 22 expect(3) 23 throw TestException() 24 } 25 }.asPublisher().subscribe(object : JFlow.Subscriber<Int> { 26 private lateinit var subscription: JFlow.Subscription 27 28 override fun onComplete() { 29 expectUnreached() 30 } 31 32 override fun onSubscribe(s: JFlow.Subscription?) { 33 subscription = s!! 34 subscription.request(2) 35 } 36 37 override fun onNext(t: Int) { 38 expect(t) 39 subscription.cancel() 40 } 41 42 override fun onError(t: Throwable?) { 43 assertTrue(t is TestException) 44 expect(4) 45 } 46 }) 47 finish(5) 48 } 49 50 @Test testCancellationIsNotReportednull51 fun testCancellationIsNotReported() { 52 expect(1) 53 flow { 54 emit(2) 55 }.asPublisher().subscribe(object : JFlow.Subscriber<Int> { 56 private lateinit var subscription: JFlow.Subscription 57 58 override fun onComplete() { 59 expectUnreached() 60 } 61 62 override fun onSubscribe(s: JFlow.Subscription?) { 63 subscription = s!! 64 subscription.request(2) 65 } 66 67 override fun onNext(t: Int) { 68 expect(t) 69 subscription.cancel() 70 } 71 72 override fun onError(t: Throwable?) { 73 expectUnreached() 74 } 75 }) 76 finish(3) 77 } 78 79 @Test <lambda>null80 fun testFlowWithTimeout() = runTest { 81 val publisher = flow<Int> { 82 expect(2) 83 withTimeout(1) { delay(Long.MAX_VALUE) } 84 }.asPublisher() 85 try { 86 expect(1) 87 publisher.awaitFirstOrNull() 88 } catch (e: CancellationException) { 89 expect(3) 90 } 91 finish(4) 92 } 93 } 94