• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package kotlinx.coroutines.jdk9
2 
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.flow.*
6 import org.junit.Test
7 import java.util.concurrent.Flow as JFlow
8 import kotlin.test.*
9 
10 class FlowAsPublisherTest : TestBase() {
11 
12     @Test
testErrorOnCancellationIsReportednull13     fun testErrorOnCancellationIsReported() {
14         expect(1)
15         flow {
16             try {
17                 emit(2)
18             } finally {
19                 expect(3)
20                 throw TestException()
21             }
22         }.asPublisher().subscribe(object : JFlow.Subscriber<Int> {
23             private lateinit var subscription: JFlow.Subscription
24 
25             override fun onComplete() {
26                 expectUnreached()
27             }
28 
29             override fun onSubscribe(s: JFlow.Subscription?) {
30                 subscription = s!!
31                 subscription.request(2)
32             }
33 
34             override fun onNext(t: Int) {
35                 expect(t)
36                 subscription.cancel()
37             }
38 
39             override fun onError(t: Throwable?) {
40                 assertIs<TestException>(t)
41                 expect(4)
42             }
43         })
44         finish(5)
45     }
46 
47     @Test
testCancellationIsNotReportednull48     fun testCancellationIsNotReported() {
49         expect(1)
50         flow {
51             emit(2)
52         }.asPublisher().subscribe(object : JFlow.Subscriber<Int> {
53             private lateinit var subscription: JFlow.Subscription
54 
55             override fun onComplete() {
56                 expectUnreached()
57             }
58 
59             override fun onSubscribe(s: JFlow.Subscription?) {
60                 subscription = s!!
61                 subscription.request(2)
62             }
63 
64             override fun onNext(t: Int) {
65                 expect(t)
66                 subscription.cancel()
67             }
68 
69             override fun onError(t: Throwable?) {
70                 expectUnreached()
71             }
72         })
73         finish(3)
74     }
75 
76     @Test
<lambda>null77     fun testFlowWithTimeout() = runTest {
78         val publisher = flow<Int> {
79             expect(2)
80             withTimeout(1) { delay(Long.MAX_VALUE) }
81         }.asPublisher()
82         try {
83             expect(1)
84             publisher.awaitFirstOrNull()
85         } catch (e: CancellationException) {
86             expect(3)
87         }
88         finish(4)
89     }
90 }
91